Re: SPARK-20325 - Spark Structured Streaming documentation Update: checkpoint configuration

2017-04-14 Thread Michael Armbrust
>
> 1)  could we update documentation for Structured Streaming and describe
> that checkpointing could be specified by 
> spark.sql.streaming.checkpointLocation
> on SparkSession level and thus automatically checkpoint dirs will be
> created per foreach query?
>
>
Sure, please open a pull request.


> 2) Do we really need to specify the checkpoint dir per query? what the
> reason for this? finally we will be forced to write some checkpointDir name
> generator, for example associate it with some particular named query and so
> on?
>

Every query needs to have a unique checkpoint as this is how we track what
has been processed.  If we don't have this, we can't restart the query
where it left off.  In you example, I would suggest including the metric
name in the checkpoint location path.


[jira] [Resolved] (SPARK-16899) Structured Streaming Checkpointing Example invalid

2017-04-14 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-16899.
--
Resolution: Not A Problem

This has been fixed.  I believe you are using an old version of Spark.

> Structured Streaming Checkpointing Example invalid
> --
>
> Key: SPARK-16899
> URL: https://issues.apache.org/jira/browse/SPARK-16899
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Structured Streaming
>Reporter: Vladimir Feinberg
>Priority: Minor
>
> The structured streaming checkpointing example at the bottom of the page 
> (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)
>  has the following excerpt:
> {code}
> aggDF
>.writeStream
>.outputMode("complete")
>.option(“checkpointLocation”, “path/to/HDFS/dir”)
>.format("memory")
>.start()
> {code}
> But memory sinks are not fault-tolerant. Indeed, trying this out, I get the 
> following error: 
> {{This query does not support recovering from checkpoint location. Delete 
> /tmp/streaming.metadata-625631e5-baee-41da-acd1-f16c82f68a40/offsets to start 
> over.;}}
> The documentation should be changed to demonstrate checkpointing for a 
> non-aggregation streaming task, and explicitly mention there is no way to 
> checkpoint aggregates.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-16899) Structured Streaming Checkpointing Example invalid

2017-04-14 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-16899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-16899:
-
Component/s: Structured Streaming

> Structured Streaming Checkpointing Example invalid
> --
>
> Key: SPARK-16899
> URL: https://issues.apache.org/jira/browse/SPARK-16899
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation, Structured Streaming
>Reporter: Vladimir Feinberg
>Priority: Minor
>
> The structured streaming checkpointing example at the bottom of the page 
> (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)
>  has the following excerpt:
> {code}
> aggDF
>.writeStream
>.outputMode("complete")
>.option(“checkpointLocation”, “path/to/HDFS/dir”)
>.format("memory")
>.start()
> {code}
> But memory sinks are not fault-tolerant. Indeed, trying this out, I get the 
> following error: 
> {{This query does not support recovering from checkpoint location. Delete 
> /tmp/streaming.metadata-625631e5-baee-41da-acd1-f16c82f68a40/offsets to start 
> over.;}}
> The documentation should be changed to demonstrate checkpointing for a 
> non-aggregation streaming task, and explicitly mention there is no way to 
> checkpoint aggregates.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: 2.2 branch

2017-04-13 Thread Michael Armbrust
Yeah, I was delaying until 2.1.1 was out and some of the hive questions
were resolved.  I'll make progress on that by the end of the week.  Lets
aim for 2.2 branch cut next week.

On Thu, Apr 13, 2017 at 8:56 AM, Koert Kuipers  wrote:

> i see there is no 2.2 branch yet for spark. has this been pushed out until
> after 2.1.1 is done?
>
>
> thanks!
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
It sounds like you want a tumbling window (where the slide and duration are
the same).  This is the default if you give only one interval.  You should
set the output mode to "update" (i.e. output only the rows that have been
updated since the last trigger) and the trigger to "1 second".

Try thinking about the batch query that would produce the answer you want.
Structured streaming will figure out an efficient way to compute that
answer incrementally as new data arrives.

On Mon, Apr 10, 2017 at 12:20 PM, kant kodali  wrote:

> Hi Michael,
>
> Thanks for the response. I guess I was thinking more in terms of the
> regular streaming model. so In this case I am little confused what my
> window interval and slide interval be for the following case?
>
> I need to hold a state (say a count) for 24 hours while capturing all its
> updates and produce results every second. I also need to reset the state
> (the count) back to zero every 24 hours.
>
>
>
>
>
>
> On Mon, Apr 10, 2017 at 11:49 AM, Michael Armbrust  > wrote:
>
>> Nope, structured streaming eliminates the limitation that micro-batching
>> should affect the results of your streaming query.  Trigger is just an
>> indication of how often you want to produce results (and if you leave it
>> blank we just run as quickly as possible).
>>
>> To control how tuples are grouped into a window, take a look at the
>> window
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time>
>> function.
>>
>> On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Is the trigger interval mentioned in this doc
>>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>>> the same as batch interval in structured streaming? For example I have a
>>> long running receiver(not kafka) which sends me a real time stream I want
>>> to use window interval, slide interval of 24 hours to create the Tumbling
>>> window effect but I want to process updates every second.
>>>
>>> Thanks!
>>>
>>
>>
>


[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)

2017-04-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15963353#comment-15963353
 ] 

Michael Armbrust commented on SPARK-19067:
--

No, this will be available in Spark 2.2.0

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --
>
> Key: SPARK-19067
> URL: https://issues.apache.org/jira/browse/SPARK-19067
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Michael Armbrust
>Assignee: Tathagata Das
>Priority: Critical
> Fix For: 2.2.0
>
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> //  New methods on KeyValueGroupedDataset 
> class KeyValueGroupedDataset[K, V] {  
>   // Scala friendly
>   def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
> def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>   // Java friendly
>def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // --- New Java-friendly function classes --- 
> public interface MapGroupsWithStateFunction extends Serializable {
>   R call(K key, Iterator values, state: State) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction extends 
> Serializable {
>   Iterator call(K key, Iterator values, state: GroupState) throws 
> Exception;
> }
> // -- Wrapper class for state data -- 
> trait GroupState[S] {
>   def exists(): Boolean   
>   def get(): S// throws Exception is state does not 
> exist
>   def getOption(): Option[S]   
>   def update(newState: S): Unit
>   def remove(): Unit  // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> GroupState[Long]) => {
> val newCount = words.size + runningCount.getOption.getOrElse(0L)
> runningCount.update(newCount)
>(word, newCount)
> }
> dataset   // type 
> is Dataset[String]
>   .groupByKey[String](w => w) // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while) - 
> Done
> - General expression based expiration - TODO. Any real usecases that cannot 
> be done with timeouts?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Cant convert Dataset to case class with Option fields

2017-04-10 Thread Michael Armbrust
Options should work.  Can you give a full example that is freezing?  Which
version of Spark are you using?

On Fri, Apr 7, 2017 at 6:59 AM, Dirceu Semighini Filho <
dirceu.semigh...@gmail.com> wrote:

> Hi Devs,
> I've some case classes here, and it's fields are all optional
> case class A(b:Option[B] = None, c: Option[C] = None, ...)
>
> If I read some data in a DataSet and try to connvert it to this case class
> using the as method, it doesn't give me any answer, it simple freeze.
> If I change the case class to
>
> case class A(b:B,c:C)
> id work nice and return the field values as null.
>
> Option fields aren't supported by the as method or is this an Issue?
>
> Kind Regards,
> Dirceu
>


Re: Is the trigger interval the same as batch interval in structured streaming?

2017-04-10 Thread Michael Armbrust
Nope, structured streaming eliminates the limitation that micro-batching
should affect the results of your streaming query.  Trigger is just an
indication of how often you want to produce results (and if you leave it
blank we just run as quickly as possible).

To control how tuples are grouped into a window, take a look at the window

function.

On Thu, Apr 6, 2017 at 10:26 AM, kant kodali  wrote:

> Hi All,
>
> Is the trigger interval mentioned in this doc
> 
> the same as batch interval in structured streaming? For example I have a
> long running receiver(not kafka) which sends me a real time stream I want
> to use window interval, slide interval of 24 hours to create the Tumbling
> window effect but I want to process updates every second.
>
> Thanks!
>


[jira] [Commented] (SPARK-20216) Install pandoc on machine(s) used for packaging

2017-04-04 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956045#comment-15956045
 ] 

Michael Armbrust commented on SPARK-20216:
--

I think it all runs on 
https://amplab.cs.berkeley.edu/jenkins/computer/amp-jenkins-worker-01/

> Install pandoc on machine(s) used for packaging
> ---
>
> Key: SPARK-20216
> URL: https://issues.apache.org/jira/browse/SPARK-20216
> Project: Spark
>  Issue Type: Bug
>  Components: Project Infra, PySpark
>Affects Versions: 2.1.1, 2.2.0
>Reporter: holdenk
>Priority: Blocker
>
> For Python packaging having pandoc is required to have a reasonable package 
> doc string. Which ever machine(s) are used for packaging should have both 
> pandoc and pypandoc installed on them.
> cc [~joshrosen] who I know was doing something related to this



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: [VOTE] Apache Spark 2.1.1 (RC2)

2017-04-04 Thread Michael Armbrust
Thanks for the comments everyone.  This vote fails.  Here's how I think we
should proceed:
 - [SPARK-20197] - SparkR CRAN - appears to be resolved
 - [SPARK-] - Python packaging - Holden, please file a JIRA and report
if this is a regression and if there is an easy fix that we should wait for.

For all the other test failures, please take the time to look through JIRA
and open an issue if one does not already exist so that we can triage if
these are just environmental issues.  If I don't hear any objections I'm
going to go ahead with RC3 tomorrow.

On Sun, Apr 2, 2017 at 1:16 PM, Felix Cheung 
wrote:

> -1
> sorry, found an issue with SparkR CRAN check.
> Opened SPARK-20197 and working on fix.
>
> --
> *From:* holden.ka...@gmail.com  on behalf of
> Holden Karau 
> *Sent:* Friday, March 31, 2017 6:25:20 PM
> *To:* Xiao Li
> *Cc:* Michael Armbrust; dev@spark.apache.org
> *Subject:* Re: [VOTE] Apache Spark 2.1.1 (RC2)
>
> -1 (non-binding)
>
> Python packaging doesn't seem to have quite worked out (looking
> at PKG-INFO the description is "Description: ! missing pandoc do not
> upload to PyPI "), ideally it would be nice to have this as a version
> we upgrade to PyPi.
> Building this on my own machine results in a longer description.
>
> My guess is that whichever machine was used to package this is missing the
> pandoc executable (or possibly pypandoc library).
>
> On Fri, Mar 31, 2017 at 3:40 PM, Xiao Li  wrote:
>
>> +1
>>
>> Xiao
>>
>> 2017-03-30 16:09 GMT-07:00 Michael Armbrust :
>>
>>> Please vote on releasing the following candidate as Apache Spark
>>> version 2.1.0. The vote is open until Sun, April 2nd, 2018 at 16:30 PST
>>> and passes if a majority of at least 3 +1 PMC votes are cast.
>>>
>>> [ ] +1 Release this package as Apache Spark 2.1.1
>>> [ ] -1 Do not release this package because ...
>>>
>>>
>>> To learn more about Apache Spark, please see http://spark.apache.org/
>>>
>>> The tag to be voted on is v2.1.1-rc2
>>> <https://github.com/apache/spark/tree/v2.1.1-rc2> (
>>> 02b165dcc2ee5245d1293a375a31660c9d4e1fa6)
>>>
>>> List of JIRA tickets resolved can be found with this filter
>>> <https://issues.apache.org/jira/browse/SPARK-20134?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.1>
>>> .
>>>
>>> The release files, including signatures, digests, etc. can be found at:
>>> http://home.apache.org/~pwendell/spark-releases/spark-2.1.1-rc2-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://people.apache.org/keys/committer/pwendell.asc
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1227/
>>>
>>> The documentation corresponding to this release can be found at:
>>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.1-rc2-docs/
>>>
>>>
>>> *FAQ*
>>>
>>> *How can I help test this release?*
>>>
>>> If you are a Spark user, you can help us test this release by taking an
>>> existing Spark workload and running on this release candidate, then
>>> reporting any regressions.
>>>
>>> *What should happen to JIRA tickets still targeting 2.1.1?*
>>>
>>> Committers should look at those and triage. Extremely important bug
>>> fixes, documentation, and API tweaks that impact compatibility should be
>>> worked on immediately. Everything else please retarget to 2.1.2 or 2.2.0.
>>>
>>> *But my bug isn't fixed!??!*
>>>
>>> In order to make timely releases, we will typically not hold the release
>>> unless the bug in question is a regression from 2.1.0.
>>>
>>> *What happened to RC1?*
>>>
>>> There were issues with the release packaging and as a result was skipped.
>>>
>>
>>
>
>
> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: map transform on array in spark sql

2017-04-04 Thread Michael Armbrust
If you can find the name of the struct field from the schema you can just
do:

df.select($"arrayField.a")

Selecting a field from an array returns an array with that field selected
from each element.

On Mon, Apr 3, 2017 at 8:18 PM, Koert Kuipers  wrote:

> i have a DataFrame where one column has type:
>
> ArrayType(StructType(Seq(
>   StructField("a", typeA, nullableA),
>   StructField("b", typeB, nullableB)
> )))
>
> i would like to map over this array to pick the first element in the
> struct. so the result should be a ArrayType(typeA, nullableA). i realize i
> can do this with a scala udf if i know typeA. but what if i dont know typeA?
>
> basically i would like to do an expression like:
> map(col("x"), _(0)))
>
> any suggestions?
>
>


Re: Convert Dataframe to Dataset in pyspark

2017-04-03 Thread Michael Armbrust
You don't need encoders in python since its all dynamically typed anyway.
You can just do the following if you want the data as a string.

sqlContext.read.text("/home/spark/1.6/lines").rdd.map(lambda row: row.value)

2017-04-01 5:36 GMT-07:00 Selvam Raman :

> In Scala,
> val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
>
> what is the equivalent code in pyspark?
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


[VOTE] Apache Spark 2.1.1 (RC2)

2017-03-30 Thread Michael Armbrust
Please vote on releasing the following candidate as Apache Spark version
2.1.0. The vote is open until Sun, April 2nd, 2018 at 16:30 PST and passes
if a majority of at least 3 +1 PMC votes are cast.

[ ] +1 Release this package as Apache Spark 2.1.1
[ ] -1 Do not release this package because ...


To learn more about Apache Spark, please see http://spark.apache.org/

The tag to be voted on is v2.1.1-rc2
 (
02b165dcc2ee5245d1293a375a31660c9d4e1fa6)

List of JIRA tickets resolved can be found with this filter

.

The release files, including signatures, digests, etc. can be found at:
http://home.apache.org/~pwendell/spark-releases/spark-2.1.1-rc2-bin/

Release artifacts are signed with the following key:
https://people.apache.org/keys/committer/pwendell.asc

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1227/

The documentation corresponding to this release can be found at:
http://people.apache.org/~pwendell/spark-releases/spark-2.1.1-rc2-docs/


*FAQ*

*How can I help test this release?*

If you are a Spark user, you can help us test this release by taking an
existing Spark workload and running on this release candidate, then
reporting any regressions.

*What should happen to JIRA tickets still targeting 2.1.1?*

Committers should look at those and triage. Extremely important bug fixes,
documentation, and API tweaks that impact compatibility should be worked on
immediately. Everything else please retarget to 2.1.2 or 2.2.0.

*But my bug isn't fixed!??!*

In order to make timely releases, we will typically not hold the release
unless the bug in question is a regression from 2.1.0.

*What happened to RC1?*

There were issues with the release packaging and as a result was skipped.


Re: Why VectorUDT private?

2017-03-30 Thread Michael Armbrust
I think really the right way to think about things that are marked private
is, "this may disappear or change in a future minor release".  If you are
okay with that, working about the visibility restrictions is reasonable.

On Thu, Mar 30, 2017 at 5:52 AM, Koert Kuipers  wrote:

> I stopped asking long time ago why things are private in spark... I
> mean... The conversion between ml and mllib vectors is private... the
> conversion between spark vector and breeze used to be (or still is?)
> private. it just goes on. Lots of useful stuff is private[SQL].
>
> Luckily there are simple ways to get around these visibility restrictions
>
> On Mar 29, 2017 22:57, "Ryan"  wrote:
>
>> I'm writing a transformer and the input column is vector type(which is
>> the output column from other transformer). But as the VectorUDT is private,
>> how could I check/transform schema for the vector column?
>>
>


[jira] [Updated] (SPARK-20103) Spark structured steaming from kafka - last message processed again after resume from checkpoint

2017-03-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20103:
-
Fix Version/s: 2.2.0

> Spark structured steaming from kafka - last message processed again after 
> resume from checkpoint
> 
>
> Key: SPARK-20103
> URL: https://issues.apache.org/jira/browse/SPARK-20103
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Linux, Spark 2.10 
>Reporter: Rajesh Mutha
>  Labels: spark, streaming
> Fix For: 2.2.0
>
>
> When the application starts after a failure or a graceful shutdown, it is 
> consistently processing the last message of the previous batch even though it 
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature. 
> Is this the default behavior of spark? I added a code snippet with 2 
> streaming queries. One of the query is idempotent; since query2 is not 
> idempotent, we are seeing duplicate entries in table. 
> {code}
> object StructuredStreaming {
>   def main(args: Array[String]): Unit = {
> val db_url = 
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
> val spark = SparkSession
>   .builder
>   .appName("StructuredKafkaReader")
>   .master("local[*]")
>   .getOrCreate()
> spark.conf.set("spark.sql.streaming.checkpointLocation", 
> "/tmp/checkpoint_research/")
> import spark.implicits._
> val server = "10.205.82.113:9092"
> val topic = "checkpoint"
> val subscribeType="subscribe"
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", server)
>   .option(subscribeType, topic)
>   .load().selectExpr("CAST(value AS STRING)").as[String]
> lines.printSchema()
> import org.apache.spark.sql.ForeachWriter
> val writer = new ForeachWriter[String] {
>def open(partitionId: Long, version: Long):  Boolean = {
>  println("After db props"); true
>}
>def process(value: String) = {
>  val conn = DriverManager.getConnection(db_url)
>  try{
>conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint1 VALUES ('"+value+"')")
>  }
>  finally {
>conn.close()
>  }
>   }
>def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query1 = lines.writeStream
>  .outputMode("append")
>  .queryName("checkpoint1")
>  .trigger(ProcessingTime(30.seconds))
>  .foreach(writer)
>  .start()
>  val writer2 = new ForeachWriter[String] {
>   def open(partitionId: Long, version: Long):  Boolean = {
> println("After db props"); true
>   }
>   def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
>   conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint2 VALUES ('"+value+"')")
> }
> finally {
>   conn.close()
> }
>}
>   def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query2 = lines.writeStream
>   .outputMode("append")
>   .queryName("checkpoint2")
>   .trigger(ProcessingTime(30.seconds))
>   .foreach(writer2)
>   .start()
> query2.awaitTermination()
> query1.awaitTermination()
> }}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20103) Spark structured steaming from kafka - last message processed again after resume from checkpoint

2017-03-29 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15948035#comment-15948035
 ] 

Michael Armbrust commented on SPARK-20103:
--

It is fixed in 2.2 but by [SPARK-19876].

> Spark structured steaming from kafka - last message processed again after 
> resume from checkpoint
> 
>
> Key: SPARK-20103
> URL: https://issues.apache.org/jira/browse/SPARK-20103
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Linux, Spark 2.10 
>Reporter: Rajesh Mutha
>  Labels: spark, streaming
> Fix For: 2.2.0
>
>
> When the application starts after a failure or a graceful shutdown, it is 
> consistently processing the last message of the previous batch even though it 
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature. 
> Is this the default behavior of spark? I added a code snippet with 2 
> streaming queries. One of the query is idempotent; since query2 is not 
> idempotent, we are seeing duplicate entries in table. 
> {code}
> object StructuredStreaming {
>   def main(args: Array[String]): Unit = {
> val db_url = 
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
> val spark = SparkSession
>   .builder
>   .appName("StructuredKafkaReader")
>   .master("local[*]")
>   .getOrCreate()
> spark.conf.set("spark.sql.streaming.checkpointLocation", 
> "/tmp/checkpoint_research/")
> import spark.implicits._
> val server = "10.205.82.113:9092"
> val topic = "checkpoint"
> val subscribeType="subscribe"
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", server)
>   .option(subscribeType, topic)
>   .load().selectExpr("CAST(value AS STRING)").as[String]
> lines.printSchema()
> import org.apache.spark.sql.ForeachWriter
> val writer = new ForeachWriter[String] {
>def open(partitionId: Long, version: Long):  Boolean = {
>  println("After db props"); true
>}
>def process(value: String) = {
>  val conn = DriverManager.getConnection(db_url)
>  try{
>conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint1 VALUES ('"+value+"')")
>  }
>  finally {
>conn.close()
>  }
>   }
>def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query1 = lines.writeStream
>  .outputMode("append")
>  .queryName("checkpoint1")
>  .trigger(ProcessingTime(30.seconds))
>  .foreach(writer)
>  .start()
>  val writer2 = new ForeachWriter[String] {
>   def open(partitionId: Long, version: Long):  Boolean = {
> println("After db props"); true
>   }
>   def process(value: String) = {
> val conn = DriverManager.getConnection(db_url)
> try{
>   conn.createStatement().executeUpdate("INSERT INTO 
> PUBLIC.checkpoint2 VALUES ('"+value+"')")
> }
> finally {
>   conn.close()
> }
>}
>   def close(errorOrNull: Throwable) = {}
> }
> import scala.concurrent.duration._
> val query2 = lines.writeStream
>   .outputMode("append")
>   .queryName("checkpoint2")
>   .trigger(ProcessingTime(30.seconds))
>   .foreach(writer2)
>   .start()
> query2.awaitTermination()
> query1.awaitTermination()
> }}
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-20103) Spark structured steaming from kafka - last message processed again after resume from checkpoint

2017-03-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20103:
-
Description: 
When the application starts after a failure or a graceful shutdown, it is 
consistently processing the last message of the previous batch even though it 
was already processed correctly without failure.

We are making sure database writes are idempotent using postgres 9.6 feature. 
Is this the default behavior of spark? I added a code snippet with 2 streaming 
queries. One of the query is idempotent; since query2 is not idempotent, we are 
seeing duplicate entries in table. 

{code}
object StructuredStreaming {
  def main(args: Array[String]): Unit = {
val db_url = 
"jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
val spark = SparkSession
  .builder
  .appName("StructuredKafkaReader")
  .master("local[*]")
  .getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation", 
"/tmp/checkpoint_research/")
import spark.implicits._
val server = "10.205.82.113:9092"
val topic = "checkpoint"
val subscribeType="subscribe"
val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", server)
  .option(subscribeType, topic)
  .load().selectExpr("CAST(value AS STRING)").as[String]
lines.printSchema()
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
   def open(partitionId: Long, version: Long):  Boolean = {
 println("After db props"); true
   }
   def process(value: String) = {
 val conn = DriverManager.getConnection(db_url)
 try{
   conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 
VALUES ('"+value+"')")
 }
 finally {
   conn.close()
 }
  }
   def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query1 = lines.writeStream
 .outputMode("append")
 .queryName("checkpoint1")
 .trigger(ProcessingTime(30.seconds))
 .foreach(writer)
 .start()
 val writer2 = new ForeachWriter[String] {
  def open(partitionId: Long, version: Long):  Boolean = {
println("After db props"); true
  }
  def process(value: String) = {
val conn = DriverManager.getConnection(db_url)
try{
  conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 
VALUES ('"+value+"')")
}
finally {
  conn.close()
}
   }
  def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query2 = lines.writeStream
  .outputMode("append")
  .queryName("checkpoint2")
  .trigger(ProcessingTime(30.seconds))
  .foreach(writer2)
  .start()
query2.awaitTermination()
query1.awaitTermination()
}}
{code}

  was:
When the application starts after a failure or a graceful shutdown, it is 
consistently processing the last message of the previous batch even though it 
was already processed correctly without failure.

We are making sure database writes are idempotent using postgres 9.6 feature. 
Is this the default behavior of spark? I added a code snippet with 2 streaming 
queries. One of the query is idempotent; since query2 is not idempotent, we are 
seeing duplicate entries in table. 

---
object StructuredStreaming {
  def main(args: Array[String]): Unit = {
val db_url = 
"jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
val spark = SparkSession
  .builder
  .appName("StructuredKafkaReader")
  .master("local[*]")
  .getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation", 
"/tmp/checkpoint_research/")
import spark.implicits._
val server = "10.205.82.113:9092"
val topic = "checkpoint"
val subscribeType="subscribe"
val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", server)
  .option(subscribeType, topic)
  .load().selectExpr("CAST(value AS STRING)").as[String]
lines.printSchema()
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
   def open(partitionId: Long, version: Long):  Boolean = {
 println("After db props"); true
   }
   def process(value:

[jira] [Updated] (SPARK-20103) Spark structured steaming from kafka - last message processed again after resume from checkpoint

2017-03-29 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-20103:
-
Docs Text:   (was: object StructuredStreaming {
  def main(args: Array[String]): Unit = {
val db_url = 
"jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
val spark = SparkSession
  .builder
  .appName("StructuredKafkaReader")
  .master("local[*]")
  .getOrCreate()
spark.conf.set("spark.sql.streaming.checkpointLocation", 
"/tmp/checkpoint_research/")
import spark.implicits._
val server = "10.205.82.113:9092"
val topic = "checkpoint"
val subscribeType="subscribe"
val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", server)
  .option(subscribeType, topic)
  .load().selectExpr("CAST(value AS STRING)").as[String]
lines.printSchema()
import org.apache.spark.sql.ForeachWriter
val writer = new ForeachWriter[String] {
   def open(partitionId: Long, version: Long):  Boolean = {
 println("After db props"); true
   }
   def process(value: String) = {
 val conn = DriverManager.getConnection(db_url)
 try{
   conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint1 
VALUES ('"+value+"')")
 }
 finally {
   conn.close()
 }
  }
   def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query1 = lines.writeStream
 .outputMode("append")
 .queryName("checkpoint1")
 .trigger(ProcessingTime(30.seconds))
 .foreach(writer)
 .start()
 val writer2 = new ForeachWriter[String] {
  def open(partitionId: Long, version: Long):  Boolean = {
println("After db props"); true
  }
  def process(value: String) = {
val conn = DriverManager.getConnection(db_url)
try{
  conn.createStatement().executeUpdate("INSERT INTO PUBLIC.checkpoint2 
VALUES ('"+value+"')")
}
finally {
  conn.close()
}
   }
  def close(errorOrNull: Throwable) = {}
}
import scala.concurrent.duration._
val query2 = lines.writeStream
  .outputMode("append")
  .queryName("checkpoint2")
  .trigger(ProcessingTime(30.seconds))
  .foreach(writer2)
  .start()
query2.awaitTermination()
query1.awaitTermination()
}})

> Spark structured steaming from kafka - last message processed again after 
> resume from checkpoint
> 
>
> Key: SPARK-20103
> URL: https://issues.apache.org/jira/browse/SPARK-20103
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Linux, Spark 2.10 
>Reporter: Rajesh Mutha
>  Labels: spark, streaming
>
> When the application starts after a failure or a graceful shutdown, it is 
> consistently processing the last message of the previous batch even though it 
> was already processed correctly without failure.
> We are making sure database writes are idempotent using postgres 9.6 feature. 
> Is this the default behavior of spark? I added a code snippet with 2 
> streaming queries. One of the query is idempotent; since query2 is not 
> idempotent, we are seeing duplicate entries in table. 
> {code}
> object StructuredStreaming {
>   def main(args: Array[String]): Unit = {
> val db_url = 
> "jdbc:postgresql://dynamic-milestone-dev.crv1otzbekk9.us-east-1.rds.amazonaws.com:5432/DYNAMICPOSTGRES?user=dsdbadmin&password=password"
> val spark = SparkSession
>   .builder
>   .appName("StructuredKafkaReader")
>   .master("local[*]")
>   .getOrCreate()
> spark.conf.set("spark.sql.streaming.checkpointLocation", 
> "/tmp/checkpoint_research/")
> import spark.implicits._
> val server = "10.205.82.113:9092"
> val topic = "checkpoint"
> val subscribeType="subscribe"
> val lines = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", server)
>   .option(subscribeType, topic)
>   .load().selectExpr("CAST(value AS STRING)").as[String]
> lines.pr

Re: Outstanding Spark 2.1.1 issues

2017-03-28 Thread Michael Armbrust
We just fixed the build yesterday.  I'll kick off a new RC today.

On Tue, Mar 28, 2017 at 8:04 AM, Asher Krim  wrote:

> Hey Michael,
> any update on this? We're itching for a 2.1.1 release (specifically
> SPARK-14804 which is currently blocking us)
>
> Thanks,
> Asher Krim
> Senior Software Engineer
>
> On Wed, Mar 22, 2017 at 7:44 PM, Michael Armbrust 
> wrote:
>
>> An update: I cut the tag for RC1 last night.  Currently fighting with the
>> release process.  Will post RC1 once I get it working.
>>
>> On Tue, Mar 21, 2017 at 2:16 PM, Nick Pentreath > > wrote:
>>
>>> As for SPARK-19759 <https://issues.apache.org/jira/browse/SPARK-19759>,
>>> I don't think that needs to be targeted for 2.1.1 so we don't need to worry
>>> about it
>>>
>>>
>>> On Tue, 21 Mar 2017 at 13:49 Holden Karau  wrote:
>>>
>>>> I agree with Michael, I think we've got some outstanding issues but
>>>> none of them seem like regression from 2.1 so we should be good to start
>>>> the RC process.
>>>>
>>>> On Tue, Mar 21, 2017 at 1:41 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>> Please speak up if I'm wrong, but none of these seem like critical
>>>> regressions from 2.1.  As such I'll start the RC process later today.
>>>>
>>>> On Mon, Mar 20, 2017 at 9:52 PM, Holden Karau 
>>>> wrote:
>>>>
>>>> I'm not super sure it should be a blocker for 2.1.1 -- is it a
>>>> regression? Maybe we can get TDs input on it?
>>>>
>>>> On Mon, Mar 20, 2017 at 8:48 PM Nan Zhu  wrote:
>>>>
>>>> I think https://issues.apache.org/jira/browse/SPARK-19280 should be a
>>>> blocker
>>>>
>>>> Best,
>>>>
>>>> Nan
>>>>
>>>> On Mon, Mar 20, 2017 at 8:18 PM, Felix Cheung <
>>>> felixcheun...@hotmail.com> wrote:
>>>>
>>>> I've been scrubbing R and think we are tracking 2 issues
>>>>
>>>> https://issues.apache.org/jira/browse/SPARK-19237
>>>>
>>>> https://issues.apache.org/jira/browse/SPARK-19925
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> *From:* holden.ka...@gmail.com  on behalf of
>>>> Holden Karau 
>>>> *Sent:* Monday, March 20, 2017 3:12:35 PM
>>>> *To:* dev@spark.apache.org
>>>> *Subject:* Outstanding Spark 2.1.1 issues
>>>>
>>>> Hi Spark Developers!
>>>>
>>>> As we start working on the Spark 2.1.1 release I've been looking at our
>>>> outstanding issues still targeted for it. I've tried to break it down by
>>>> component so that people in charge of each component can take a quick look
>>>> and see if any of these things can/should be re-targeted to 2.2 or 2.1.2 &
>>>> the overall list is pretty short (only 9 items - 5 if we only look at
>>>> explicitly tagged) :)
>>>>
>>>> If your working on something for Spark 2.1.1 and it doesn't show up in
>>>> this list please speak up now :) We have a lot of issues (including "in
>>>> progress") that are listed as impacting 2.1.0, but they aren't targeted for
>>>> 2.1.1 - if there is something you are working in their which should be
>>>> targeted for 2.1.1 please let us know so it doesn't slip through the 
>>>> cracks.
>>>>
>>>> The query string I used for looking at the 2.1.1 open issues is:
>>>>
>>>> ((affectedVersion = 2.1.1 AND cf[12310320] is Empty) OR fixVersion =
>>>> 2.1.1 OR cf[12310320] = "2.1.1") AND project = spark AND resolution =
>>>> Unresolved ORDER BY priority DESC
>>>>
>>>> None of the open issues appear to be a regression from 2.1.0, but those
>>>> seem more likely to show up during the RC process (thanks in advance to
>>>> everyone testing their workloads :)) & generally none of them seem to be
>>>>
>>>> (Note: the cfs are for Target Version/s field)
>>>>
>>>> Critical Issues:
>>>>  SQL:
>>>>   SPARK-19690 <https://issues.apache.org/jira/browse/SPARK-19690> - Join
>>>> a streaming DataFrame with a batch DataFrame may not work - PR
>>>> https://github.com/apache/spark/pull/17052 (review in progress by
>&g

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Ah, I understand what you are asking now.  There is no API for specifying a
kafka specific "decoder", since Spark SQL already has a rich language for
expressing transformations.  The dataframe code I gave will parse the JSON
and materialize in a class, very similar to what objectMapper.readValue(bytes,
Tweet.class) would do.

However, there are other cases where you might need to do some domain
specific transformation that Spark SQL doesn't support natively.  In this
case you can write a UDF that does the translation. There are a couple of
different ways you can specify this, depending on whether you want to
map/flatMap or just apply the function as a UDF to a single column
<http://stackoverflow.com/questions/35348058/how-do-i-call-a-udf-on-a-spark-dataframe-using-java>
.


On Mon, Mar 27, 2017 at 1:59 PM, kaniska Mandal 
wrote:

> yup, that solves the compilation issue :-)
>
> one quick question regarding specifying Decoder in kafka stream:
>
> please note that I am encoding the message as follows while sending data
> to kafka -
>
> 
>
> *String msg = objectMapper.writeValueAsString(tweetEvent);*
>
> *return msg.getBytes();*
>
> I have a corresponding 
>
> *return objectMapper.readValue(bytes, Tweet.class)*
>
>
> *>> how do I specify the Decoder in the following stream-processing flow ?*
> streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers", bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .withColumn("message", from_json(col("value").cast("string"),
> tweetSchema)) // cast the binary value to a string and parse it as json
>   .select("message.*") // unnest the json
>   .as(Encoders.bean(Tweet.class))
>
> Thanks
> Kaniska
>
> -
>
> On Mon, Mar 27, 2017 at 1:25 PM, Michael Armbrust 
> wrote:
>
>> You need to import col from org.apache.spark.sql.functions.
>>
>> On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal > > wrote:
>>
>>> Hi Michael,
>>>
>>> Can you please check if I am using correct version of spark-streaming
>>> library as specified in my pom (specified in the email) ?
>>>
>>> col("value").cast("string") - throwing an error 'cannot find symbol
>>> method col(java.lang.String)'
>>> I tried $"value" which results into similar compilation error.
>>>
>>> Thanks
>>> Kaniska
>>>
>>>
>>>
>>> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Sorry, I don't think that I understand the question.  Value is just a
>>>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>>>> I think the code I provided is a good option, but if you are using a
>>>> different encoding you may need to write a UDF.
>>>>
>>>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal <
>>>> kaniska.man...@gmail.com> wrote:
>>>>
>>>>> Hi Michael,
>>>>>
>>>>> Thanks much for the suggestion.
>>>>>
>>>>> I was wondering - whats the best way to deserialize the 'value' field
>>>>>
>>>>>
>>>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>>>> mich...@databricks.com> wrote:
>>>>>
>>>>>> Encoders can only map data into an object if those columns already
>>>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>>>> in JSON it should be pretty straight forward.
>>>>>>
>>>>>> streams = spark
>>>>>>   .readStream()
>>>>>>   .format("kafka")
>>>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>>>   .option(subscribeType, topics)
>>>>>>   .load()
>>>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>>>   .select("message.*") // unnest the json
>>>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to
>>>>>> use lambda functions on the data using this class
>>>>

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
You need to import col from org.apache.spark.sql.functions.

On Mon, Mar 27, 2017 at 1:20 PM, kaniska Mandal 
wrote:

> Hi Michael,
>
> Can you please check if I am using correct version of spark-streaming
> library as specified in my pom (specified in the email) ?
>
> col("value").cast("string") - throwing an error 'cannot find symbol
> method col(java.lang.String)'
> I tried $"value" which results into similar compilation error.
>
> Thanks
> Kaniska
>
>
>
> On Mon, Mar 27, 2017 at 12:09 PM, Michael Armbrust  > wrote:
>
>> Sorry, I don't think that I understand the question.  Value is just a
>> binary blob that we get from kafka and pass to you.  If its stored in JSON,
>> I think the code I provided is a good option, but if you are using a
>> different encoding you may need to write a UDF.
>>
>> On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal > > wrote:
>>
>>> Hi Michael,
>>>
>>> Thanks much for the suggestion.
>>>
>>> I was wondering - whats the best way to deserialize the 'value' field
>>>
>>>
>>> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
>>>> Encoders can only map data into an object if those columns already
>>>> exist.  When we are reading from Kafka, we just get a binary blob and
>>>> you'll need to help Spark parse that first.  Assuming your data is stored
>>>> in JSON it should be pretty straight forward.
>>>>
>>>> streams = spark
>>>>   .readStream()
>>>>   .format("kafka")
>>>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>>>   .option(subscribeType, topics)
>>>>   .load()
>>>>   .withColumn("message", from_json(col("value").cast("string"),
>>>> tweetSchema)) // cast the binary value to a string and parse it as json
>>>>   .select("message.*") // unnest the json
>>>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>>>> lambda functions on the data using this class
>>>>
>>>> Here is some more info on working with JSON and other semi-structured
>>>> formats
>>>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>>>> .
>>>>
>>>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Currently , encountering the following exception while working with
>>>>> below-mentioned code snippet :
>>>>>
>>>>> > Please suggest the correct approach for reading the stream into a sql
>>>>> > schema.
>>>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>>>> message -
>>>>> > we can not change static schema for kafka.
>>>>>
>>>>> 
>>>>> ---
>>>>>
>>>>> *exception*
>>>>>
>>>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>>>> '`location`' given input columns: [topic, timestamp, key, offset,
>>>>> value,
>>>>> timestampType, partition]*;
>>>>> at
>>>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>>>> At.failAnalysis(package.scala:42)
>>>>> at
>>>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>>>> 
>>>>> 
>>>>>
>>>>> *structured streaming code snippet*
>>>>>
>>>>> String bootstrapServers = "localhost:9092";
>>>>> String subscribeType = "subscribe";
>>>>> String topics = "events";
>>>>>
>>>>> StructType tweetSchema = new StructType()
>>>>> .add("tweetId", "string")
>>>>> .add("tweetText", "string")
>>>>> .add("location&quo

Re: unable to stream kafka messages

2017-03-27 Thread Michael Armbrust
Sorry, I don't think that I understand the question.  Value is just a
binary blob that we get from kafka and pass to you.  If its stored in JSON,
I think the code I provided is a good option, but if you are using a
different encoding you may need to write a UDF.

On Fri, Mar 24, 2017 at 4:58 PM, kaniska Mandal 
wrote:

> Hi Michael,
>
> Thanks much for the suggestion.
>
> I was wondering - whats the best way to deserialize the 'value' field
>
>
> On Fri, Mar 24, 2017 at 11:47 AM, Michael Armbrust  > wrote:
>
>> Encoders can only map data into an object if those columns already
>> exist.  When we are reading from Kafka, we just get a binary blob and
>> you'll need to help Spark parse that first.  Assuming your data is stored
>> in JSON it should be pretty straight forward.
>>
>> streams = spark
>>   .readStream()
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", bootstrapServers)
>>   .option(subscribeType, topics)
>>   .load()
>>   .withColumn("message", from_json(col("value").cast("string"),
>> tweetSchema)) // cast the binary value to a string and parse it as json
>>   .select("message.*") // unnest the json
>>   .as(Encoders.bean(Tweet.class)) // only required if you want to use
>> lambda functions on the data using this class
>>
>> Here is some more info on working with JSON and other semi-structured
>> formats
>> <https://databricks.com/blog/2017/02/23/working-complex-data-formats-structured-streaming-apache-spark-2-1.html>
>> .
>>
>> On Fri, Mar 24, 2017 at 10:49 AM, kaniska 
>> wrote:
>>
>>> Hi,
>>>
>>> Currently , encountering the following exception while working with
>>> below-mentioned code snippet :
>>>
>>> > Please suggest the correct approach for reading the stream into a sql
>>> > schema.
>>> > If I add 'tweetSchema' while reading stream, it errors out with
>>> message -
>>> > we can not change static schema for kafka.
>>>
>>> 
>>> ---
>>>
>>> *exception*
>>>
>>> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
>>> '`location`' given input columns: [topic, timestamp, key, offset, value,
>>> timestampType, partition]*;
>>> at
>>> org.apache.spark.sql.catalyst.analysis.package$AnalysisError
>>> At.failAnalysis(package.scala:42)
>>> at
>>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfu
>>> n$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:77)
>>> 
>>> 
>>>
>>> *structured streaming code snippet*
>>>
>>> String bootstrapServers = "localhost:9092";
>>> String subscribeType = "subscribe";
>>> String topics = "events";
>>>
>>> StructType tweetSchema = new StructType()
>>> .add("tweetId", "string")
>>> .add("tweetText", "string")
>>> .add("location", "string")
>>> .add("timestamp", "string");
>>>
>>>SparkSession spark = SparkSession
>>>   .builder()
>>>   .appName("StreamProcessor")
>>>   .config("spark.master", "local")
>>>   .getOrCreate();
>>>
>>>   Dataset streams = spark
>>>   .readStream()
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers",
>>> bootstrapServers)
>>>   .option(subscribeType, topics)
>>>   .load()
>>>   .as(Encoders.bean(Tweet.class));
>>>
>>>  streams.createOrReplaceTempView("streamsData");
>>>
>>>String sql = "SELECT location,  COUNT(*) as count
>>> FROM streamsData
>>> GROUP BY location";
>>>   

Re: How to insert nano seconds in the TimestampType in Spark

2017-03-27 Thread Michael Armbrust
The timestamp type is only microsecond precision.  You would need to store
it on your own (as binary or limited range long or something) if you
require nanosecond precision.

On Mon, Mar 27, 2017 at 5:29 AM, Devender Yadav <
devender.ya...@impetus.co.in> wrote:

> Hi All,
>
> I am using spark version - 1.6.1
>
> I have a text table in hive having `timestamp` datatype with nanoseconds
> precision.
>
> Hive Table Schema:
>
> c_timestamp timestamp
>
> Hive Table data:
>
> hive> select * from tbl1;
> OK
> 00:00:00.1
> 12:12:12.123456789
> 23:59:59.9
>
> But as per the docs, from Spark 1.5
>
> *Timestamps are now stored at a precision of 1us, rather than 1ns*
>
>
> Sample code:
>
> SparkConf conf = new SparkConf(true).setMaster("
> yarn-cluster").setAppName("SAMPLE_APP");
> SparkContext sc = new SparkContext(conf);
> HiveContext hc = new HiveContext(sc);
> DataFrame df = hc.table("testdb.tbl1");
>
> Data is truncated to microseconds.
>
> 00:00:00
> 12:12:12.123456
> 23:59:59.99
>
>
> Is there any way to use nanoseconds here?
>
>
> Regards,
> Devender
>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: unable to stream kafka messages

2017-03-24 Thread Michael Armbrust
Encoders can only map data into an object if those columns already exist.
When we are reading from Kafka, we just get a binary blob and you'll need
to help Spark parse that first.  Assuming your data is stored in JSON it
should be pretty straight forward.

streams = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option(subscribeType, topics)
  .load()
  .withColumn("message", from_json(col("value").cast("string"),
tweetSchema)) // cast the binary value to a string and parse it as json
  .select("message.*") // unnest the json
  .as(Encoders.bean(Tweet.class)) // only required if you want to use
lambda functions on the data using this class

Here is some more info on working with JSON and other semi-structured
formats

.

On Fri, Mar 24, 2017 at 10:49 AM, kaniska  wrote:

> Hi,
>
> Currently , encountering the following exception while working with
> below-mentioned code snippet :
>
> > Please suggest the correct approach for reading the stream into a sql
> > schema.
> > If I add 'tweetSchema' while reading stream, it errors out with message -
> > we can not change static schema for kafka.
>
> 
> ---
>
> *exception*
>
> Caused by: org.apache.spark.sql.AnalysisException: *cannot resolve
> '`location`' given input columns: [topic, timestamp, key, offset, value,
> timestampType, partition]*;
> at
> org.apache.spark.sql.catalyst.analysis.package$
> AnalysisErrorAt.failAnalysis(package.scala:42)
> at
> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(
> CheckAnalysis.scala:77)
> 
> 
>
> *structured streaming code snippet*
>
> String bootstrapServers = "localhost:9092";
> String subscribeType = "subscribe";
> String topics = "events";
>
> StructType tweetSchema = new StructType()
> .add("tweetId", "string")
> .add("tweetText", "string")
> .add("location", "string")
> .add("timestamp", "string");
>
>SparkSession spark = SparkSession
>   .builder()
>   .appName("StreamProcessor")
>   .config("spark.master", "local")
>   .getOrCreate();
>
>   Dataset streams = spark
>   .readStream()
>   .format("kafka")
>   .option("kafka.bootstrap.servers",
> bootstrapServers)
>   .option(subscribeType, topics)
>   .load()
>   .as(Encoders.bean(Tweet.class));
>
>  streams.createOrReplaceTempView("streamsData");
>
>String sql = "SELECT location,  COUNT(*) as count FROM
> streamsData
> GROUP BY location";
>Dataset countsByLocation = spark.sql(sql);
>
> StreamingQuery query = countsByLocation.writeStream()
>   .outputMode("complete")
>   .format("console")
>   .start();
>
> query.awaitTermination();
> 
> --
>
> *Tweet *
>
> Tweet.java - has public constructor and getter / setter methods
>
> public class Tweet implements Serializable{
>
> private String tweetId;
> private String tweetText;
> private String location;
> private String timestamp;
>
> public Tweet(){
>
> }
> .
>
> 
> 
>
> *pom.xml *
>
>
> 
> org.apache.spark
> spark-core_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-streaming-
> kafka-0-8_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql_2.10
> 2.1.0
> 
> 
> org.apache.spark
> spark-sql-kafka-0-10_2.10
> 2.1.0
> 
> 
> 
>
>
>
> --
> View this message 

Re: how to read object field within json file

2017-03-24 Thread Michael Armbrust
I'm not sure you can parse this as an Array, but you can hint to the parser
that you would like to treat source as a map instead of as a struct.  This
is a good strategy when you have dynamic columns in your data.

Here is an example of the schema you can use to parse this JSON and also
how to use explode to turn it into separate rows
.
This blog post has more on working with semi-structured data in Spark

.

On Thu, Mar 23, 2017 at 2:49 PM, Yong Zhang  wrote:

> That's why your "source" should be defined as an Array[Struct] type (which
> makes sense in this case, it has an undetermined length  , so you can
> explode it and get the description easily.
>
> Now you need write your own UDF, maybe can do what you want.
>
> Yong
>
> --
> *From:* Selvam Raman 
> *Sent:* Thursday, March 23, 2017 5:03 PM
> *To:* user
> *Subject:* how to read object field within json file
>
> Hi,
>
> {
> "id": "test1",
> "source": {
> "F1": {
>   "id": "4970",
>   "eId": "F1",
>   "description": "test1",
> },
> "F2": {
>   "id": "5070",
>   "eId": "F2",
>   "description": "test2",
> },
> "F3": {
>   "id": "5170",
>   "eId": "F3",
>   "description": "test3",
> },
> "F4":{}
>   etc..
>   "F999":{}
> }
>
> I am having bzip json files like above format.
> some json row contains two objects within source(like F1 and F2), sometime
> five(F1,F2,F3,F4,F5),etc. So the final schema will contains combination of
> all objects for the source field.
>
> Now, every row will contain n number of objects but only some contains
> valid records.
> how can i retreive the value of "description" in "source" field.
>
> source.F1.description - returns the result but how can i get all
> description result for every row..(something like this
> "source.*.description").
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


[jira] [Commented] (SPARK-10816) EventTime based sessionization

2017-03-23 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15939156#comment-15939156
 ] 

Michael Armbrust commented on SPARK-10816:
--

Just a quick note for people interested in this topic.  The more advanced API 
that lets you do arbitrary grouped stateful operations with timeouts based on 
processing time or event time has been merged into master.  See [SPARK-19067] 
for more details.

> EventTime based sessionization
> --
>
> Key: SPARK-10816
> URL: https://issues.apache.org/jira/browse/SPARK-10816
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Reporter: Reynold Xin
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-18970) FileSource failure during file list refresh doesn't cause an application to fail, but stops further processing

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-18970.
--
   Resolution: Fixed
Fix Version/s: 2.1.0

I'm going to close this, but please reopen if you can reproduce on 2.1.1+.

> FileSource failure during file list refresh doesn't cause an application to 
> fail, but stops further processing
> --
>
> Key: SPARK-18970
> URL: https://issues.apache.org/jira/browse/SPARK-18970
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming
>Affects Versions: 2.0.0, 2.0.2
>Reporter: Lev
> Fix For: 2.1.0
>
> Attachments: sparkerror.log
>
>
> Spark streaming application uses S3 files as streaming sources. After running 
> for several day processing stopped even though an application continued to 
> run. 
> Stack trace:
> {code}
> java.io.FileNotFoundException: No such file or directory 
> 's3n://X'
>   at 
> com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:818)
>   at 
> com.amazon.ws.emr.hadoop.fs.EmrFileSystem.getFileStatus(EmrFileSystem.java:511)
>   at 
> org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:465)
>   at 
> org.apache.spark.sql.execution.datasources.HadoopFsRelation$$anonfun$7$$anonfun$apply$3.apply(fileSourceInterfaces.scala:462)
>   at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>   at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
>   at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
>   at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
>   at scala.collection.AbstractIterator.to(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
>   at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
>   at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:893)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1897)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:85)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> I believe 2 things should (or can) be fixed:
> 1. Application should fail in case of such an error.
> 2. Allow application to ignore such failure, since there is a chance that 
> during next refresh the error will not resurface. (In my case I believe an 
> error was cased by S3 cleaning the bucket exactly at the same moment when 
> refresh was running) 
> My code to create streaming processing looks as the following:
> {code}
>   val cq = sqlContext.readStream
> .format("json")
> .schema(struct)
> .load(s"input")
> .writeStream
> .option("checkpointLocation", s"checkpoints")
> .foreach(new ForeachWriter[Row] {...})
> .trigger(ProcessingTime("10 seconds")).start()
>   
> cq.awaitTermination() 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-17344) Kafka 0.8 support for Structured Streaming

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-17344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust closed SPARK-17344.

Resolution: Won't Fix

Unless someone really wants to work on this, i think the fact that they have 
compatibility for 0.10.0+ is reason enough to close this JIRA.

> Kafka 0.8 support for Structured Streaming
> --
>
> Key: SPARK-17344
> URL: https://issues.apache.org/jira/browse/SPARK-17344
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Frederick Reiss
>
> Design and implement Kafka 0.8-based sources and sinks for Structured 
> Streaming.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19965) DataFrame batch reader may fail to infer partitions when reading FileStreamSink's output

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19965:
-
Target Version/s: 2.2.0

> DataFrame batch reader may fail to infer partitions when reading 
> FileStreamSink's output
> 
>
> Key: SPARK-19965
> URL: https://issues.apache.org/jira/browse/SPARK-19965
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>
> Reproducer
> {code}
>   test("partitioned writing and batch reading with 'basePath'") {
> val inputData = MemoryStream[Int]
> val ds = inputData.toDS()
> val outputDir = Utils.createTempDir(namePrefix = 
> "stream.output").getCanonicalPath
> val checkpointDir = Utils.createTempDir(namePrefix = 
> "stream.checkpoint").getCanonicalPath
> var query: StreamingQuery = null
> try {
>   query =
> ds.map(i => (i, i * 1000))
>   .toDF("id", "value")
>   .writeStream
>   .partitionBy("id")
>   .option("checkpointLocation", checkpointDir)
>   .format("parquet")
>   .start(outputDir)
>   inputData.addData(1, 2, 3)
>   failAfter(streamingTimeout) {
> query.processAllAvailable()
>   }
>   spark.read.option("basePath", outputDir).parquet(outputDir + 
> "/*").show()
> } finally {
>   if (query != null) {
> query.stop()
>   }
> }
>   }
> {code}
> Stack trace
> {code}
> [info] - partitioned writing and batch reading with 'basePath' *** FAILED *** 
> (3 seconds, 928 milliseconds)
> [info]   java.lang.AssertionError: assertion failed: Conflicting directory 
> structures detected. Suspicious paths:
> [info]***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637
> [info]
> ***/stream.output-65e3fa45-595a-4d29-b3df-4c001e321637/_spark_metadata
> [info] 
> [info] If provided paths are partition directories, please set "basePath" in 
> the options of the data source to specify the root directory of the table. If 
> there are multiple root directories, please load them separately and then 
> union them.
> [info]   at scala.Predef$.assert(Predef.scala:170)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:98)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.inferPartitioning(PartitioningAwareFileIndex.scala:156)
> [info]   at 
> org.apache.spark.sql.execution.datasources.InMemoryFileIndex.partitionSpec(InMemoryFileIndex.scala:54)
> [info]   at 
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex.partitionSchema(PartitioningAwareFileIndex.scala:55)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:133)
> [info]   at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:361)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:160)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:536)
> [info]   at 
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:520)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply$mcV$sp(FileStreamSinkSuite.scala:292)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> [info]   at 
> org.apache.spark.sql.streaming.FileStreamSinkSuite$$anonfun$8.apply(FileStreamSinkSuite.scala:268)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19767) API Doc pages for Streaming with Kafka 0.10 not current

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19767?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19767:
-
Component/s: (was: Structured Streaming)
 DStreams

> API Doc pages for Streaming with Kafka 0.10 not current
> ---
>
> Key: SPARK-19767
> URL: https://issues.apache.org/jira/browse/SPARK-19767
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Nick Afshartous
>Priority: Minor
>
> The API docs linked from the Spark Kafka 0.10 Integration page are not 
> current.  For instance, on the page
>https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
> the code examples show the new API (i.e. class ConsumerStrategies).  However, 
> following the links
> API Docs --> (Scala | Java)
> lead to API pages that do not have class ConsumerStrategies) .  The API doc 
> package names  also have {code}streaming.kafka{code} as opposed to 
> {code}streaming.kafka10{code} 
> as in the code examples on streaming-kafka-0-10-integration.html.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19013) java.util.ConcurrentModificationException when using s3 path as checkpointLocation

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-19013.
--
Resolution: Later

It seems like [HADOOP-13345] is the right solution here, but since this is 
outside of the scope of things we can fix in Spark, I'm going to close this 
ticket to keep the backlog clear.

> java.util.ConcurrentModificationException when using s3 path as 
> checkpointLocation 
> ---
>
> Key: SPARK-19013
> URL: https://issues.apache.org/jira/browse/SPARK-19013
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.2
>Reporter: Tim Chan
>
> I have a structured stream job running on EMR. The job will fail due to this
> {code}
> Multiple HDFSMetadataLog are using s3://mybucket/myapp 
> org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:162)
> {code}
> There is only one instance of this stream job running.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19788) DataStreamReader/DataStreamWriter.option shall accept user-defined type

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-19788.
--
Resolution: Won't Fix

Thanks for the suggestion.  However, as [~zsxwing] said, the goal here is a 
small, cross-language compatible API that is the same as the batch version.  I 
think it totally reasonable for specific source to produce typesafe bindings on 
top of this API.  (look at spark-avro for an example)

> DataStreamReader/DataStreamWriter.option shall accept user-defined type
> ---
>
> Key: SPARK-19788
> URL: https://issues.apache.org/jira/browse/SPARK-19788
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Nan Zhu
>
> There are many other data sources/sinks which has very different 
> configuration ways than Kafka, FileSystem, etc. 
> The expected type of the configuration entry passed to them might be a nested 
> collection type, e.g. Map[String, Map[String, String]], or even a 
> user-defined type(for example, the one I am working on)
> Right now, option can only accept String -> String/Boolean/Long/Double OR a 
> complete Map[String, String]...my suggestion is that we can accept 
> Map[String, Any], and the type of 'parameters' in SourceProvider.createSource 
> can also be Map[String, Any], this will create much more flexibility to the 
> user
> The drawback is that, it is a breaking change ( we can mitigate this by 
> deprecating the current one, and progressively evolve to the new one if the 
> proposal is accepted)
> [~zsxwing] what do you think?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-19932) Disallow a case that might cause OOM for steaming deduplication

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust resolved SPARK-19932.
--
Resolution: Won't Fix

Thanks for working on this.  While I think it would be helpful to come up with 
a full proposal to help users understand which of their queries might result in 
unscalable amounts of state, I don't think we should do it piecemeal in this 
way.

> Disallow a case that might cause OOM for steaming deduplication
> ---
>
> Key: SPARK-19932
> URL: https://issues.apache.org/jira/browse/SPARK-19932
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Liwei Lin
>
> {code}
> spark
>.readStream // schema: (word, eventTime), like ("a", 10), 
> ("a", 11), ("b", 12) ...
>...
>.withWatermark("eventTime", "10 seconds")
>.dropDuplicates("word") // note: "eventTime" is not part of the key 
> columns
>...
> {code}
> As shown above, right now if watermark is specified for a streaming 
> dropDuplicates query, but not specified as the key columns, then we'll still 
> get the correct answer, but the state just keeps growing and will never get 
> cleaned up.
> The reason is, the watermark attribute is not part of the key of the state 
> store in this case. We're not saving event time information in the state 
> store.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-19876) Add OneTime trigger executor

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-19876:


Assignee: Tyson Condie

> Add OneTime trigger executor
> 
>
> Key: SPARK-19876
> URL: https://issues.apache.org/jira/browse/SPARK-19876
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Tyson Condie
>Assignee: Tyson Condie
>
> The goal is to add a new trigger executor that will process a single trigger 
> then stop. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19876) Add OneTime trigger executor

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19876:
-
Target Version/s: 2.2.0

> Add OneTime trigger executor
> 
>
> Key: SPARK-19876
> URL: https://issues.apache.org/jira/browse/SPARK-19876
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Tyson Condie
>
> The goal is to add a new trigger executor that will process a single trigger 
> then stop. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19989) Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19989:
-
Description: 
This test failed recently here: 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74683/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceStressSuite/stress_test_with_multiple_topics_and_partitions/

And based on Josh's dashboard 
(https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaSourceStressSuite&test_name=stress+test+with+multiple+topics+and+partitions),
 seems to fail a few times every month.  Here's the full error from the most 
recent failure:

Error Message
{code}
org.scalatest.exceptions.TestFailedException:  Error adding data: replication 
factor: 1 larger than available brokers: 0 
kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)  
kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)  
org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)
  
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)
  
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)
  
org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)
  
org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
  scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)  
org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)
  
org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)
{code}

{code}
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
Error adding data: replication factor: 1 larger than available brokers: 0
kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)

org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)

org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)

org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)

org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)

org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)

org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)

org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)


== Progress ==
   AssertOnQuery(, )
   CheckAnswer: 
   StopStream
   
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@5d888be0,Map())
   AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data 
= Range(0, 1, 2, 3, 4, 5, 6, 7, 8), message = )
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9]
   StopStream
   
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1be724ee,Map())
   AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data 
= Range(9, 10, 11, 12, 13, 14), message = )
   CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15]
   StopStream
   AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data 
= Range(), message = )
=> AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range(15), message = Add topic stress7)
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range(16, 17, 18, 19, 20, 21, 22), message = Add partition)
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range(23, 24), message = Add partition)
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = Range(), message = Add topic stress9)
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = Range(25, 26, 27, 28, 29, 30, 31, 32, 33), message = )
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = Range(), message = )
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = Range(), message = )
   AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = Range(34, 35, 36, 37, 38, 39), message = )
   AddKafkaData(topics = Set(stress4,

[jira] [Updated] (SPARK-19989) Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite

2017-03-22 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19989?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19989:
-
Target Version/s: 2.2.0

> Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite
> 
>
> Key: SPARK-19989
> URL: https://issues.apache.org/jira/browse/SPARK-19989
> Project: Spark
>  Issue Type: Bug
>  Components: SQL, Structured Streaming, Tests
>Affects Versions: 2.2.0
>Reporter: Kay Ousterhout
>Priority: Minor
>  Labels: flaky-test
>
> This test failed recently here: 
> https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74683/testReport/junit/org.apache.spark.sql.kafka010/KafkaSourceStressSuite/stress_test_with_multiple_topics_and_partitions/
> And based on Josh's dashboard 
> (https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.sql.kafka010.KafkaSourceStressSuite&test_name=stress+test+with+multiple+topics+and+partitions),
>  seems to fail a few times every month.  Here's the full error from the most 
> recent failure:
> Error Message
> {code}
> org.scalatest.exceptions.TestFailedException:  Error adding data: replication 
> factor: 1 larger than available brokers: 0 
> kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)  
> kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)  
> org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
>   scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)  
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)
> {code}
> {code}
> sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
> Error adding data: replication factor: 1 larger than available brokers: 0
> kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:117)
>   kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:403)
>   
> org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:173)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:903)
>   
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$16$$anonfun$apply$mcV$sp$17$$anonfun$37.apply(KafkaSourceSuite.scala:901)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:93)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$1.apply(KafkaSourceSuite.scala:92)
>   scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
>   
> org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData.addData(KafkaSourceSuite.scala:92)
>   
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:494)
> == Progress ==
>AssertOnQuery(, )
>CheckAnswer: 
>StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@5d888be0,Map())
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(0, 1, 2, 3, 4, 5, 6, 7, 8), message = )
>CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9]
>StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1be724ee,Map())
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(9, 10, 11, 12, 13, 14), message = )
>CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15]
>StopStream
>AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range(), message = )
> => AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range(15), message = Add topic stress7)
>AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
> stress3), data = Range(16, 17, 18, 19, 20, 21, 22), message = Add parti

Re: Outstanding Spark 2.1.1 issues

2017-03-22 Thread Michael Armbrust
An update: I cut the tag for RC1 last night.  Currently fighting with the
release process.  Will post RC1 once I get it working.

On Tue, Mar 21, 2017 at 2:16 PM, Nick Pentreath 
wrote:

> As for SPARK-19759 <https://issues.apache.org/jira/browse/SPARK-19759>, I
> don't think that needs to be targeted for 2.1.1 so we don't need to worry
> about it
>
>
> On Tue, 21 Mar 2017 at 13:49 Holden Karau  wrote:
>
>> I agree with Michael, I think we've got some outstanding issues but none
>> of them seem like regression from 2.1 so we should be good to start the RC
>> process.
>>
>> On Tue, Mar 21, 2017 at 1:41 PM, Michael Armbrust > > wrote:
>>
>> Please speak up if I'm wrong, but none of these seem like critical
>> regressions from 2.1.  As such I'll start the RC process later today.
>>
>> On Mon, Mar 20, 2017 at 9:52 PM, Holden Karau 
>> wrote:
>>
>> I'm not super sure it should be a blocker for 2.1.1 -- is it a
>> regression? Maybe we can get TDs input on it?
>>
>> On Mon, Mar 20, 2017 at 8:48 PM Nan Zhu  wrote:
>>
>> I think https://issues.apache.org/jira/browse/SPARK-19280 should be a
>> blocker
>>
>> Best,
>>
>> Nan
>>
>> On Mon, Mar 20, 2017 at 8:18 PM, Felix Cheung 
>> wrote:
>>
>> I've been scrubbing R and think we are tracking 2 issues
>>
>> https://issues.apache.org/jira/browse/SPARK-19237
>>
>> https://issues.apache.org/jira/browse/SPARK-19925
>>
>>
>>
>>
>> --
>> *From:* holden.ka...@gmail.com  on behalf of
>> Holden Karau 
>> *Sent:* Monday, March 20, 2017 3:12:35 PM
>> *To:* dev@spark.apache.org
>> *Subject:* Outstanding Spark 2.1.1 issues
>>
>> Hi Spark Developers!
>>
>> As we start working on the Spark 2.1.1 release I've been looking at our
>> outstanding issues still targeted for it. I've tried to break it down by
>> component so that people in charge of each component can take a quick look
>> and see if any of these things can/should be re-targeted to 2.2 or 2.1.2 &
>> the overall list is pretty short (only 9 items - 5 if we only look at
>> explicitly tagged) :)
>>
>> If your working on something for Spark 2.1.1 and it doesn't show up in
>> this list please speak up now :) We have a lot of issues (including "in
>> progress") that are listed as impacting 2.1.0, but they aren't targeted for
>> 2.1.1 - if there is something you are working in their which should be
>> targeted for 2.1.1 please let us know so it doesn't slip through the cracks.
>>
>> The query string I used for looking at the 2.1.1 open issues is:
>>
>> ((affectedVersion = 2.1.1 AND cf[12310320] is Empty) OR fixVersion =
>> 2.1.1 OR cf[12310320] = "2.1.1") AND project = spark AND resolution =
>> Unresolved ORDER BY priority DESC
>>
>> None of the open issues appear to be a regression from 2.1.0, but those
>> seem more likely to show up during the RC process (thanks in advance to
>> everyone testing their workloads :)) & generally none of them seem to be
>>
>> (Note: the cfs are for Target Version/s field)
>>
>> Critical Issues:
>>  SQL:
>>   SPARK-19690 <https://issues.apache.org/jira/browse/SPARK-19690> - Join
>> a streaming DataFrame with a batch DataFrame may not work - PR
>> https://github.com/apache/spark/pull/17052 (review in progress by
>> zsxwing, currently failing Jenkins)*
>>
>> Major Issues:
>>  SQL:
>>   SPARK-19035 <https://issues.apache.org/jira/browse/SPARK-19035> - rand()
>> function in case when cause failed - no outstanding PR (consensus on JIRA
>> seems to be leaning towards it being a real issue but not necessarily
>> everyone agrees just yet - maybe we should slip this?)*
>>  Deploy:
>>   SPARK-19522 <https://issues.apache.org/jira/browse/SPARK-19522>
>>  - --executor-memory flag doesn't work in local-cluster mode -
>> https://github.com/apache/spark/pull/16975 (review in progress by
>> vanzin, but PR currently stalled waiting on response) *
>>  Core:
>>   SPARK-20025 <https://issues.apache.org/jira/browse/SPARK-20025> - Driver
>> fail over will not work, if SPARK_LOCAL* env is set. -
>> https://github.com/apache/spark/pull/17357 (waiting on review) *
>>  PySpark:
>>  SPARK-19955 <https://issues.apache.org/jira/browse/SPARK-19955> -
>> Update run-tests to support conda [ Part of Dropping 2.6 support -- which
>> we shouldn't do in a minor release -- but a

[jira] [Created] (SPARK-20063) Trigger without delay when falling behind

2017-03-22 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-20063:


 Summary: Trigger without delay when falling behind 
 Key: SPARK-20063
 URL: https://issues.apache.org/jira/browse/SPARK-20063
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust
Priority: Critical


Today, when we miss a trigger interval we wait until the next one to fire.  
However, for real workloads this usually means that you fall further and 
further behind by sitting idle while waiting.  We should revisit this decision.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20009) Use user-friendly DDL formats for defining a schema in user-facing APIs

2017-03-22 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15936876#comment-15936876
 ] 

Michael Armbrust commented on SPARK-20009:
--

Yeah, the DDL format is certainly a lot easier to type than the JSON.  I think 
it makes sense to support both if we can tell the difference unambiguously 
(which I think we can).

> Use user-friendly DDL formats for defining a schema  in user-facing APIs
> 
>
> Key: SPARK-20009
> URL: https://issues.apache.org/jira/browse/SPARK-20009
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Takeshi Yamamuro
>
> In https://issues.apache.org/jira/browse/SPARK-19830, we add a new API in the 
> DDL parser to convert a DDL string into a schema. Then, we can use DDL 
> formats in existing some APIs, e.g., functions.from_json 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L3062.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Outstanding Spark 2.1.1 issues

2017-03-21 Thread Michael Armbrust
Please speak up if I'm wrong, but none of these seem like critical
regressions from 2.1.  As such I'll start the RC process later today.

On Mon, Mar 20, 2017 at 9:52 PM, Holden Karau  wrote:

> I'm not super sure it should be a blocker for 2.1.1 -- is it a regression?
> Maybe we can get TDs input on it?
>
> On Mon, Mar 20, 2017 at 8:48 PM Nan Zhu  wrote:
>
>> I think https://issues.apache.org/jira/browse/SPARK-19280 should be a
>> blocker
>>
>> Best,
>>
>> Nan
>>
>> On Mon, Mar 20, 2017 at 8:18 PM, Felix Cheung 
>> wrote:
>>
>> I've been scrubbing R and think we are tracking 2 issues
>>
>> https://issues.apache.org/jira/browse/SPARK-19237
>>
>> https://issues.apache.org/jira/browse/SPARK-19925
>>
>>
>>
>>
>> --
>> *From:* holden.ka...@gmail.com  on behalf of
>> Holden Karau 
>> *Sent:* Monday, March 20, 2017 3:12:35 PM
>> *To:* dev@spark.apache.org
>> *Subject:* Outstanding Spark 2.1.1 issues
>>
>> Hi Spark Developers!
>>
>> As we start working on the Spark 2.1.1 release I've been looking at our
>> outstanding issues still targeted for it. I've tried to break it down by
>> component so that people in charge of each component can take a quick look
>> and see if any of these things can/should be re-targeted to 2.2 or 2.1.2 &
>> the overall list is pretty short (only 9 items - 5 if we only look at
>> explicitly tagged) :)
>>
>> If your working on something for Spark 2.1.1 and it doesn't show up in
>> this list please speak up now :) We have a lot of issues (including "in
>> progress") that are listed as impacting 2.1.0, but they aren't targeted for
>> 2.1.1 - if there is something you are working in their which should be
>> targeted for 2.1.1 please let us know so it doesn't slip through the cracks.
>>
>> The query string I used for looking at the 2.1.1 open issues is:
>>
>> ((affectedVersion = 2.1.1 AND cf[12310320] is Empty) OR fixVersion =
>> 2.1.1 OR cf[12310320] = "2.1.1") AND project = spark AND resolution =
>> Unresolved ORDER BY priority DESC
>>
>> None of the open issues appear to be a regression from 2.1.0, but those
>> seem more likely to show up during the RC process (thanks in advance to
>> everyone testing their workloads :)) & generally none of them seem to be
>>
>> (Note: the cfs are for Target Version/s field)
>>
>> Critical Issues:
>>  SQL:
>>   SPARK-19690  - Join
>> a streaming DataFrame with a batch DataFrame may not work - PR
>> https://github.com/apache/spark/pull/17052 (review in progress by
>> zsxwing, currently failing Jenkins)*
>>
>> Major Issues:
>>  SQL:
>>   SPARK-19035  - rand()
>> function in case when cause failed - no outstanding PR (consensus on JIRA
>> seems to be leaning towards it being a real issue but not necessarily
>> everyone agrees just yet - maybe we should slip this?)*
>>  Deploy:
>>   SPARK-19522 
>>  - --executor-memory flag doesn't work in local-cluster mode -
>> https://github.com/apache/spark/pull/16975 (review in progress by
>> vanzin, but PR currently stalled waiting on response) *
>>  Core:
>>   SPARK-20025  - Driver
>> fail over will not work, if SPARK_LOCAL* env is set. -
>> https://github.com/apache/spark/pull/17357 (waiting on review) *
>>  PySpark:
>>  SPARK-19955  -
>> Update run-tests to support conda [ Part of Dropping 2.6 support -- which
>> we shouldn't do in a minor release -- but also fixes pip installability
>> tests to run in Jenkins ]-  PR failing Jenkins (I need to poke this some
>> more, but seems like 2.7 support works but some other issues. Maybe slip to
>> 2.2?)
>>
>> Minor issues:
>>  Tests:
>>   SPARK-19612  - Tests
>> failing with timeout - No PR per-se but it seems unrelated to the 2.1.1
>> release. It's not targetted for 2.1.1 but listed as affecting 2.1.1 - I'd
>> consider explicitly targeting this for 2.2?
>>  PySpark:
>>   SPARK-19570  - Allow
>> to disable hive in pyspark shell - https://github.com/apache/
>> spark/pull/16906 PR exists but its difficult to add automated tests for
>> this (although if SPARK-19955
>>  gets in would make
>> testing this easier) - no reviewers yet. Possible re-target?*
>>  Structured Streaming:
>>   SPARK-19613  - Flaky
>> test: StateStoreRDDSuite.versioning and immutability - It's not targetted
>> for 2.1.1 but listed as affecting 2.1.1 - I'd consider explicitly targeting
>> this for 2.2?
>>  ML:
>>   SPARK-19759 
>>  - ALSModel.predict on Dataframes : potential optimization by not using
>> blas - No PR consider re-targeting unless someone has a PR waiting in the
>> wi

Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Another option that would avoid a shuffle would be to use assign and
coalesce, running two separate streams.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami 
wrote:

> @Cody : Duly noted.
> @Michael Ambrust : A repartition is out of the question for our project as
> it would be a fairly expensive operation. We tried looking into targeting a
> specific executor so as to avoid this extra cost and directly have well
> partitioned data after consuming the kafka topics. Also we are using Spark
> streaming to save to the cassandra DB and try to keep shuffle operations to
> a strict minimum (at best none). As of now we are not entirely pleased with
> our current performances, that's why I'm doing a kafka topic sharding POC
> and getting the executor to handle the specificied partitions is central.
> ᐧ
>
> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>
>> Sorry, typo.  Should be a repartition not a groupBy.
>>
>>
>>> spark.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "...")
>>>   .option("subscribe", "t0,t1")
>>>   .load()
>>>   .repartition($"partition")
>>>   .writeStream
>>>   .foreach(... code to write to cassandra ...)
>>>
>>
>
>
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>


[jira] [Commented] (SPARK-19982) JavaDatasetSuite.testJavaBeanEncoder sometimes fails with "Unable to generate an encoder for inner class"

2017-03-16 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15929275#comment-15929275
 ] 

Michael Armbrust commented on SPARK-19982:
--

I'm not sure if changing weak to strong references will change 
[anything|http://stackoverflow.com/questions/299659/what-is-the-difference-between-a-soft-reference-and-a-weak-reference-in-java].
  It seems like there must be another handle to {{this}} since the test harness 
is actively executing it.  So either way it shouldn't be available for garbage 
collection, or am I missing something?

> JavaDatasetSuite.testJavaBeanEncoder sometimes fails with "Unable to generate 
> an encoder for inner class"
> -
>
> Key: SPARK-19982
> URL: https://issues.apache.org/jira/browse/SPARK-19982
> Project: Spark
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.1.0
>Reporter: Jose Soltren
>  Labels: flaky-test
>
> JavaDatasetSuite.testJavaBeanEncoder fails sporadically with the error below:
> Unable to generate an encoder for inner class 
> `test.org.apache.spark.sql.JavaDatasetSuite$SimpleJavaBean` without access to 
> the scope that this class was defined in. Try moving this class out of its 
> parent class.
> From https://spark-tests.appspot.com/test-logs/35475788
> [~vanzin] looked into this back in October and reported:
> I ran this test in a loop (both alone and with the rest of the spark-sql 
> tests) and never got a failure. I even used the same JDK as Jenkins 
> (1.7.0_51).
> Also looked at the code and nothing seems wrong. The errors is when an entry 
> with the parent class name is missing from the map kept in OuterScopes.scala, 
> but the test populates that map in its first line. So it doesn't look like a 
> race nor some issue with weak references (the map uses weak values).
>   public void testJavaBeanEncoder() {
> OuterScopes.addOuterScope(this);



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Streaming 2.1.0 - window vs. batch duration

2017-03-16 Thread Michael Armbrust
Have you considered trying event time aggregation in structured streaming
instead?

On Thu, Mar 16, 2017 at 12:34 PM, Dominik Safaric 
wrote:

> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every
> 1 second (batch interval), I am observing some quite strange behaviour
> (didn’t use Spark extensively in the past, but continuous operator based
> engines instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back
> to Kafka contains more messages then they were consumed (for debugging
> purposes using a small dataset of a million Kafka byte array deserialized
> messages). In particular, in total I’ve streamed exactly 1 million
> messages, whereas upon window expiry 60 million messages are written back
> to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval *t* the same batch is being ingested 59 more times
> into the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik
>


Re: [Spark Streaming+Kafka][How-to]

2017-03-16 Thread Michael Armbrust
I think it should be straightforward to express this using structured
streaming.  You could ensure that data from a given partition ID is
processed serially by performing a group by on the partition column.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("subscribe", "t0,t1")
  .load()
  .groupBy($"partition")
  .writeStream
  .foreach(... code to write to cassandra ...)


On Thu, Mar 16, 2017 at 8:10 AM, Cody Koeninger  wrote:

> Spark just really isn't a good fit for trying to pin particular
> computation to a particular executor, especially if you're relying on that
> for correctness.
>
> On Thu, Mar 16, 2017 at 7:16 AM, OUASSAIDI, Sami 
> wrote:
>
>>
>> Hi all,
>>
>> So I need to specify how an executor should consume data from a kafka
>> topic.
>>
>> Let's say I have 2 topics : t0 and t1 with two partitions each, and two
>> executors e0 and e1 (both can be on the same node so assign strategy does
>> not work since in the case of a multi executor node it works based on round
>> robin scheduling, whatever first available executor consumes the topic
>> partition )
>>
>> What I would like to do is make e0 consume partition 0 from both t0 and
>> t1 while e1 consumes partition 1 from the t0 and t1. Is there no way around
>> it except messing with scheduling ? If so what's the best approach.
>>
>> The reason for doing so is that executors will write to a cassandra
>> database and since we will be in a parallelized context one executor might
>> "collide" with another and therefore data will be lost, by assigning a
>> partition I want to force the executor to process the data sequentially.
>>
>> Thanks
>> Sami
>> --
>> *Mind7 Consulting*
>>
>> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
>> __
>>
>> 64 Rue Taitbout, 75009 Paris
>> ᐧ
>>
>
>


Spark 2.2 Code-freeze - 3/20

2017-03-15 Thread Michael Armbrust
Hey Everyone,

Just a quick announcement that I'm planning to cut the branch for Spark 2.2
this coming Monday (3/20).  Please try and get things merged before then
and also please begin retargeting of any issues that you don't think will
make the release.

Michael


Re: Should we consider a Spark 2.1.1 release?

2017-03-15 Thread Michael Armbrust
Hey Holden,

Thanks for bringing this up!  I think we usually cut patch releases when
there are enough fixes to justify it.  Sometimes just a few weeks after the
release.  I guess if we are at 3 months Spark 2.1.0 was a pretty good
release :)

That said, it is probably time. I was about to start thinking about 2.2 as
well (we are a little past the posted code-freeze deadline), so I'm happy
to push the buttons etc (this is a very good description
 if you are curious). I would
love help watching JIRA, posting the burn down on issues and shepherding in
any critical patches.  Feel free to ping me off-line if you like to
coordinate.

Unless there are any objections, how about we aim for an RC of 2.1.1 on
Monday and I'll also plan to cut branch-2.2 then?  (I'll send a separate
email on this as well).

Michael

On Mon, Mar 13, 2017 at 1:40 PM, Holden Karau  wrote:

> I'd be happy to do the work of coordinating a 2.1.1 release if that's a
> thing a committer can do (I think the release coordinator for the most
> recent Arrow release was a committer and the final publish step took a PMC
> member to upload but other than that I don't remember any issues).
>
> On Mon, Mar 13, 2017 at 1:05 PM Sean Owen  wrote:
>
>> It seems reasonable to me, in that other x.y.1 releases have followed ~2
>> months after the x.y.0 release and it's been about 3 months since 2.1.0.
>>
>> Related: creating releases is tough work, so I feel kind of bad voting
>> for someone else to do that much work. Would it make sense to deputize
>> another release manager to help get out just the maintenance releases? this
>> may in turn mean maintenance branches last longer. Experienced hands can
>> continue to manage new minor and major releases as they require more
>> coordination.
>>
>> I know most of the release process is written down; I know it's also
>> still going to be work to make it 100% documented. Eventually it'll be
>> necessary to make sure it's entirely codified anyway.
>>
>> Not pushing for it myself, just noting I had heard this brought up in
>> side conversations before.
>>
>>
>> On Mon, Mar 13, 2017 at 7:07 PM Holden Karau 
>> wrote:
>>
>> Hi Spark Devs,
>>
>> Spark 2.1 has been out since end of December
>> 
>> and we've got quite a few fixes merged for 2.1.1
>> 
>> .
>>
>> On the Python side one of the things I'd like to see us get out into a
>> patch release is a packaging fix (now merged) before we upload to PyPI &
>> Conda, and we also have the normal batch of fixes like toLocalIterator for
>> large DataFrames in PySpark.
>>
>> I've chatted with Felix & Shivaram who seem to think the R side is
>> looking close to in good shape for a 2.1.1 release to submit to CRAN (if
>> I've miss-spoken my apologies). The two outstanding issues that are being
>> tracked for R are SPARK-18817, SPARK-19237.
>>
>> Looking at the other components quickly it seems like structured
>> streaming could also benefit from a patch release.
>>
>> What do others think - are there any issues people are actively targeting
>> for 2.1.1? Is this too early to be considering a patch release?
>>
>> Cheers,
>>
>> Holden
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>> --
> Cell : 425-233-8271 <(425)%20233-8271>
> Twitter: https://twitter.com/holdenkarau
>


Re: Structured Streaming - Can I start using it?

2017-03-13 Thread Michael Armbrust
I think its very very unlikely that it will get withdrawn.  The primary
reason that the APIs are still marked experimental is that we like to have
several releases before committing to interface stability (in particular
the interfaces to write custom sources and sinks are likely to evolve).
Also, there are currently quite a few limitations in the types of queries
that we can run (i.e. multiple aggregations are disallowed, we don't
support stream-stream joins yet).  In these cases though, we explicitly say
its not supported when you try to start your stream.

For the use cases that are supported in 2.1 though (streaming ETL, event
time aggregation, etc) I'll say that we have been using it in production
for several months and we have customers doing the same.

On Mon, Mar 13, 2017 at 11:21 AM, Gaurav1809 
wrote:

> I read in spark documentation that Structured Streaming is still ALPHA in
> Spark 2.1 and the APIs are still experimental. Shall I use it to re write
> my
> existing spark streaming code? Looks like it is not yet production ready.
> What happens if Structured Streaming project gets withdrawn?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-Can-I-
> start-using-it-tp28488.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-03-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922715#comment-15922715
 ] 

Michael Armbrust commented on SPARK-18057:
--

So to summarize, it'll be unfortunate if Kafka breaks binary compatibility and 
people are relying on the libraries that are bundled with Spark.  I don't think 
that should stop us from upgrading though, especially since we do provide 
binary compatible APIs for reading/writing from kafka and API protocol 
compatibility is no longer an issue.  If enough users complain, we should 
consider shading our dependency on Kafka.

Given that, as soon as there is a release that fixes [KAFKA-4879], I think we 
should upgrade (assuming no other regressions).  We should probably do the same 
for DStreams, unless there are objections there based on the Kafka library 
binary compatibility problems (I'm not sure if its more likely for applications 
there to interact directly with the kafka library).

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-03-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15905755#comment-15905755
 ] 

Michael Armbrust commented on SPARK-18057:
--

It seems like we can upgrade the existing Kafka10 artifacts without causing any 
compatibility issues (since 0.10.2.0 is compatible with 0.10.0.0+), so I don't 
think there is any need to make new artifacts or do any refactoring.  I think 
we can just upgrade?

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19888) Seeing offsets not resetting even when reset policy is configured explicitly

2017-03-10 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19888:
-
Component/s: (was: Spark Core)
 DStreams

> Seeing offsets not resetting even when reset policy is configured explicitly
> 
>
> Key: SPARK-19888
> URL: https://issues.apache.org/jira/browse/SPARK-19888
> Project: Spark
>  Issue Type: Bug
>  Components: DStreams
>Affects Versions: 2.1.0
>Reporter: Justin Miller
>
> I was told to post this in a Spark ticket from KAFKA-4396:
> I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be 
> two separate errors, I'm not sure. What's puzzling is that I'm setting 
> auto.offset.reset to latest and it's still throwing an 
> OffsetOutOfRangeException, behavior that's contrary to the code. Please help! 
> :)
> {code}
> val kafkaParams = Map[String, Object](
>   "group.id" -> consumerGroup,
>   "bootstrap.servers" -> bootstrapServers,
>   "key.deserializer" -> classOf[ByteArrayDeserializer],
>   "value.deserializer" -> classOf[MessageRowDeserializer],
>   "auto.offset.reset" -> "latest",
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "max.poll.records" -> persisterConfig.maxPollRecords,
>   "request.timeout.ms" -> persisterConfig.requestTimeoutMs,
>   "session.timeout.ms" -> persisterConfig.sessionTimeoutMs,
>   "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs,
>   "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs
> )
> {code}
> {code}
> 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory 
> on xyz (size: 146.3 KB, free: 8.4 GB)
> 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 
> 38837, xyz): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 
> Offsets out of range with no configured reset policy for partitions: 
> {topic=231884473}
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
> at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
> at 
> org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)
> at 
> org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
> at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 
> 39388) in 12043 ms on xyz (1/16)
> 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.

Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
If you have a reproduction you should open a JIRA.  It would be great if
there is a fix.  I'm just saying I know a similar issue does not exist in
structured streaming.

On Fri, Mar 10, 2017 at 7:46 AM, Justin Miller <
justin.mil...@protectwise.com> wrote:

> Hi Michael,
>
> I'm experiencing a similar issue. Will this not be fixed in Spark
> Streaming?
>
> Best,
> Justin
>
> On Mar 10, 2017, at 8:34 AM, Michael Armbrust 
> wrote:
>
> One option here would be to try Structured Streaming.  We've added an
> option "failOnDataLoss" that will cause Spark to just skip a head when this
> exception is encountered (its off by default though so you don't silently
> miss data).
>
> On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
> ram.the.m...@gmail.com> wrote:
>
>> I am using Spark streaming and reading data from Kafka using
>> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
>> smallest.
>>
>> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeE
>> xception
>> and my spark job crashes.
>>
>> I want to understand if there is a graceful way to handle this failure and
>> not kill the job. I want to keep ignoring these exceptions, as some other
>> partitions are fine and I am okay with data loss.
>>
>> Is there any way to handle this and not have my spark job crash? I have no
>> option of increasing the kafka retention period.
>>
>> I tried to have the DStream returned by createDirectStream() wrapped in a
>> Try construct, but since the exception happens in the executor, the Try
>> construct didn't take effect. Do you have any ideas of how to handle this?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-OffsetO
>> utOfRangeException-tp26534.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: How to gracefully handle Kafka OffsetOutOfRangeException

2017-03-10 Thread Michael Armbrust
One option here would be to try Structured Streaming.  We've added an
option "failOnDataLoss" that will cause Spark to just skip a head when this
exception is encountered (its off by default though so you don't silently
miss data).

On Fri, Mar 18, 2016 at 4:16 AM, Ramkumar Venkataraman <
ram.the.m...@gmail.com> wrote:

> I am using Spark streaming and reading data from Kafka using
> KafkaUtils.createDirectStream. I have the "auto.offset.reset" set to
> smallest.
>
> But in some Kafka partitions, I get kafka.common.OffsetOutOfRangeException
> and my spark job crashes.
>
> I want to understand if there is a graceful way to handle this failure and
> not kill the job. I want to keep ignoring these exceptions, as some other
> partitions are fine and I am okay with data loss.
>
> Is there any way to handle this and not have my spark job crash? I have no
> option of increasing the kafka retention period.
>
> I tried to have the DStream returned by createDirectStream() wrapped in a
> Try construct, but since the exception happens in the executor, the Try
> construct didn't take effect. Do you have any ideas of how to handle this?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-gracefully-handle-Kafka-
> OffsetOutOfRangeException-tp26534.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[jira] [Updated] (SPARK-18055) Dataset.flatMap can't work with types from customized jar

2017-03-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18055:
-
Target Version/s: 2.2.0

> Dataset.flatMap can't work with types from customized jar
> -
>
> Key: SPARK-18055
> URL: https://issues.apache.org/jira/browse/SPARK-18055
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
>Reporter: Davies Liu
>Assignee: Michael Armbrust
> Attachments: test-jar_2.11-1.0.jar
>
>
> Try to apply flatMap() on Dataset column which of of type
> com.A.B
> Here's a schema of a dataset:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- outputs: array (nullable = true)
>  ||-- element: string
> {code}
> flatMap works on RDD
> {code}
>  ds.rdd.flatMap(_.outputs)
> {code}
> flatMap doesnt work on dataset and gives the following error
> {code}
> ds.flatMap(_.outputs)
> {code}
> The exception:
> {code}
> scala.ScalaReflectionException: class com.A.B in JavaMirror … not found
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
> at 
> line189424fbb8cd47b3b62dc41e417841c159.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator3$1.apply(:51)
> at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
> at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
> at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
> {code}
> Spoke to Michael Armbrust and he confirmed it as a Dataset bug.
> There is a workaround using explode()
> {code}
> ds.select(explode(col("outputs")))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Assigned] (SPARK-18055) Dataset.flatMap can't work with types from customized jar

2017-03-07 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust reassigned SPARK-18055:


Assignee: Michael Armbrust

> Dataset.flatMap can't work with types from customized jar
> -
>
> Key: SPARK-18055
> URL: https://issues.apache.org/jira/browse/SPARK-18055
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.1
>Reporter: Davies Liu
>Assignee: Michael Armbrust
> Attachments: test-jar_2.11-1.0.jar
>
>
> Try to apply flatMap() on Dataset column which of of type
> com.A.B
> Here's a schema of a dataset:
> {code}
> root
>  |-- id: string (nullable = true)
>  |-- outputs: array (nullable = true)
>  ||-- element: string
> {code}
> flatMap works on RDD
> {code}
>  ds.rdd.flatMap(_.outputs)
> {code}
> flatMap doesnt work on dataset and gives the following error
> {code}
> ds.flatMap(_.outputs)
> {code}
> The exception:
> {code}
> scala.ScalaReflectionException: class com.A.B in JavaMirror … not found
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:123)
> at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:22)
> at 
> line189424fbb8cd47b3b62dc41e417841c159.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$typecreator3$1.apply(:51)
> at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at 
> org.apache.spark.sql.SQLImplicits$$typecreator9$1.apply(SQLImplicits.scala:125)
> at 
> scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:232)
> at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:232)
> at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:49)
> at 
> org.apache.spark.sql.SQLImplicits.newProductSeqEncoder(SQLImplicits.scala:125)
> {code}
> Spoke to Michael Armbrust and he confirmed it as a Dataset bug.
> There is a workaround using explode()
> {code}
> ds.select(explode(col("outputs")))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: How to unit test spark streaming?

2017-03-07 Thread Michael Armbrust
>
> Basically you abstract your transformations to take in a dataframe and
> return one, then you assert on the returned df
>

+1 to this suggestion.  This is why we wanted streaming and batch
dataframes to share the same API.


[jira] [Updated] (SPARK-19813) maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

2017-03-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19813:
-
Target Version/s: 2.2.0

> maxFilesPerTrigger combo latestFirst may miss old files in combination with 
> maxFileAge in FileStreamSource
> --
>
> Key: SPARK-19813
> URL: https://issues.apache.org/jira/browse/SPARK-19813
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Burak Yavuz
>Assignee: Burak Yavuz
>
> There is a file stream source option called maxFileAge which limits how old 
> the files can be, relative the latest file that has been seen. This is used 
> to limit the files that need to be remembered as "processed". Files older 
> than the latest processed files are ignored. This values is by default 7 days.
> This causes a problem when both 
>  - latestFirst = true
>  - maxFilesPerTrigger > total files to be processed.
> Here is what happens in all combinations
>  1) latestFirst = false - Since files are processed in order, there wont be 
> any unprocessed file older than the latest processed file. All files will be 
> processed.
>  2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
> thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
> not, then all old files get processed in the first batch, and so no file is 
> left behind.
>  3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
> process the latest X files. That sets the threshold latest file - maxFileAge, 
> so files older than this threshold will never be considered for processing. 
> The bug is with case 3.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19690) Join a streaming DataFrame with a batch DataFrame may not work

2017-03-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19690:
-
Priority: Critical  (was: Major)

> Join a streaming DataFrame with a batch DataFrame may not work
> --
>
> Key: SPARK-19690
> URL: https://issues.apache.org/jira/browse/SPARK-19690
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.3, 2.1.0, 2.1.1
>Reporter: Shixiong Zhu
>Priority: Critical
>
> When joining a streaming DataFrame with a batch DataFrame, if the batch 
> DataFrame has an aggregation, it will be converted to a streaming physical 
> aggregation. Then the query will crash.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18258) Sinks need access to offset representation

2017-03-03 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18258:
-
Target Version/s:   (was: 2.2.0)

> Sinks need access to offset representation
> --
>
> Key: SPARK-18258
> URL: https://issues.apache.org/jira/browse/SPARK-18258
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> Transactional "exactly-once" semantics for output require storing an offset 
> identifier in the same transaction as results.
> The Sink.addBatch method currently only has access to batchId and data, not 
> the actual offset representation.
> I want to store the actual offsets, so that they are recoverable as long as 
> the results are and I'm not locked in to a particular streaming engine.
> I could see this being accomplished by adding parameters to Sink.addBatch for 
> the starting and ending offsets (either the offsets themselves, or the 
> SPARK-17829 string/json representation).  That would be an API change, but if 
> there's another way to map batch ids to offset representations without 
> changing the Sink api that would work as well.  
> I'm assuming we don't need the same level of access to offsets throughout a 
> job as e.g. the Kafka dstream gives, because Sinks are the main place that 
> should need them.
> After SPARK-17829 is complete and offsets have a .json method, an api for 
> this ticket might look like
> {code}
> trait Sink {
>   def addBatch(batchId: Long, data: DataFrame, start: OffsetSeq, end: 
> OffsetSeq): Unit
> {code}
> where start and end were provided by StreamExecution.runBatch using 
> committedOffsets and availableOffsets.  
> I'm not 100% certain that the offsets in the seq could always be mapped back 
> to the correct source when restarting complicated multi-source jobs, but I 
> think it'd be sufficient.  Passing the string/json representation of the seq 
> instead of the seq itself would probably be sufficient as well, but the 
> convention of rendering a None as "-" in the json is maybe a little 
> idiosyncratic to parse, and the constant defining that is private.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Why Spark cannot get the derived field of case class in Dataset?

2017-02-28 Thread Michael Armbrust
We only serialize things that are in the constructor.  You would have
access to it in the typed API (df.map(_.day)).  I'd suggest making a
factory method that fills these in and put them in the constructor if you
need to get to it from other dataframe operations.

On Tue, Feb 28, 2017 at 12:03 PM, Yong Zhang  wrote:

> In the following example, the "day" value is in the case class, but I
> cannot get that in the Spark dataset, which I would like to use at runtime?
> Any idea? Do I have to force it to be present in the case class
> constructor? I like to derive it out automatically and used in the dataset
> or dataframe.
>
>
> Thanks
>
>
> scala> spark.versionres12: String = 2.1.0
>
> scala> import java.text.SimpleDateFormatimport java.text.SimpleDateFormat
>
> scala> val dateFormat = new SimpleDateFormat("-MM-dd")dateFormat: 
> java.text.SimpleDateFormat = java.text.SimpleDateFormat@f67a0200
>
> scala> case class Test(time: Long) { |   val day = 
> dateFormat.format(time) | }defined class Testscala> val t = 
> Test(1487185076410L)t: Test = Test(1487185076410)
>
> scala> t.timeres13: Long = 1487185076410
>
> scala> t.dayres14: String = 2017-02-15
>
> scala> val ds = Seq(t).toDS()ds: org.apache.spark.sql.Dataset[Test] = [time: 
> bigint]
>
> scala> ds.show+-+| 
> time|+-+|1487185076410|+-+
>
>
>


[jira] [Commented] (SPARK-19715) Option to Strip Paths in FileSource

2017-02-24 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15883642#comment-15883642
 ] 

Michael Armbrust commented on SPARK-19715:
--

This isn't a hypothetical.  A user of structured streaming upgraded to {{s3a}} 
and was surprised to see duplicate computation in the results.  Their files are 
named with a combination of upload time and a GUID, so I don't think there is 
any risk for this use case.  That said, I would not make this option the 
default.

> Option to Strip Paths in FileSource
> ---
>
> Key: SPARK-19715
> URL: https://issues.apache.org/jira/browse/SPARK-19715
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Michael Armbrust
>
> Today, we compare the whole path when deciding if a file is new in the 
> FileSource for structured streaming.  However, this cause cause false 
> negatives in the case where the path has changed in a cosmetic way (i.e. 
> changing s3n to s3a).  We should add an option {{fileNameOnly}} that causes 
> the new file check to be based only on the filename (but still store the 
> whole path in the log).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19721) Good error message for version mismatch in log files

2017-02-23 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19721:


 Summary: Good error message for version mismatch in log files
 Key: SPARK-19721
 URL: https://issues.apache.org/jira/browse/SPARK-19721
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust
Priority: Blocker


There are several places where we write out version identifiers in various logs 
for structured streaming (usually {{v1}}).  However, in the places where we 
check for this, we throw a confusing error message.  Instead, we should do the 
following:
 - Find all of the places where we do this kind of check.
 - for {{vN}} where {{n>1}} say "UnsupportedLogFormat: The file {{path}} was 
produced by a newer version of Spark and cannot be read by this version.  
Please upgrade"
 - for anything else throw an error saying the file is malformed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19715) Option to Strip Paths in FileSource

2017-02-23 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881731#comment-15881731
 ] 

Michael Armbrust commented on SPARK-19715:
--

[~lwlin] another file source features you might want to work on.

> Option to Strip Paths in FileSource
> ---
>
> Key: SPARK-19715
> URL: https://issues.apache.org/jira/browse/SPARK-19715
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.1.0
>Reporter: Michael Armbrust
>
> Today, we compare the whole path when deciding if a file is new in the 
> FileSource for structured streaming.  However, this cause cause false 
> negatives in the case where the path has changed in a cosmetic way (i.e. 
> changing s3n to s3a).  We should add an option {{fileNameOnly}} that causes 
> the new file check to be based only on the filename (but still store the 
> whole path in the log).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.2.0

2017-02-23 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18057:
-
Summary: Update structured streaming kafka from 10.0.1 to 10.2.0  (was: 
Update structured streaming kafka from 10.0.1 to 10.1.0)

> Update structured streaming kafka from 10.0.1 to 10.2.0
> ---
>
> Key: SPARK-18057
> URL: https://issues.apache.org/jira/browse/SPARK-18057
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Reporter: Cody Koeninger
>
> There are a couple of relevant KIPs here, 
> https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19715) Option to Strip Paths in FileSource

2017-02-23 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19715:


 Summary: Option to Strip Paths in FileSource
 Key: SPARK-19715
 URL: https://issues.apache.org/jira/browse/SPARK-19715
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust


Today, we compare the whole path when deciding if a file is new in the 
FileSource for structured streaming.  However, this cause cause false negatives 
in the case where the path has changed in a cosmetic way (i.e. changing s3n to 
s3a).  We should add an option {{fileNameOnly}} that causes the new file check 
to be based only on the filename (but still store the whole path in the log).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19637) add to_json APIs to SQL

2017-02-17 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15872470#comment-15872470
 ] 

Michael Armbrust commented on SPARK-19637:
--

>From JSON is harder because the second argument is a StructType.  We could 
>consider accepting a string in the DDL format for declaring a tables schema 
>(i.e. {{a: Int, b: struct...}}.

> add to_json APIs to SQL
> ---
>
> Key: SPARK-19637
> URL: https://issues.apache.org/jira/browse/SPARK-19637
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Burak Yavuz
>
> The method "to_json" is a useful method in turning a struct into a json 
> string. It currently doesn't work in SQL, but adding it should be trivial.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Pretty print a dataframe...

2017-02-16 Thread Michael Armbrust
The toString method of Dataset.queryExecution includes the various plans.
I usually just log that directly.

On Thu, Feb 16, 2017 at 8:26 AM, Muthu Jayakumar  wrote:

> Hello there,
>
> I am trying to write to log-line a dataframe/dataset queryExecution and/or
> its logical plan. The current code...
>
> def explain(extended: Boolean): Unit = {
>   val explain = ExplainCommand(queryExecution.logical, extended = extended)
>   
> sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach
>  {
> // scalastyle:off println
> r => println(r.getString(0))
> // scalastyle:on println
>   }
> }
>
> sessionState is not accessible if I were to write my own explain(log:
> LoggingAdapter).
>
> Please advice,
> Muthu
>


Re: Structured Streaming Spark Summit Demo - Databricks people

2017-02-16 Thread Michael Armbrust
Thanks for your interest in Apache Spark Structured Streaming!

There is nothing secret in that demo, though I did make some configuration
changes in order to get the timing right (gotta have some dramatic effect
:) ).  Also I think the visualizations based on metrics output by the
StreamingQueryListener

are
still being rolled out, but should be available everywhere soon.

First, I set two options to make sure that files were read one at a time,
thus allowing us to see incremental results.

spark.readStream
  .option("maxFilesPerTrigger", "1")
  .option("latestFirst", "true")
...

There is more detail on how these options work in this post

.

Regarding continually updating result of a streaming query using display(df)for
streaming DataFrames (i.e. one created with spark.readStream), that has
worked in Databrick's since Spark 2.1.  The longer form example we
published requires you to rerun the count to see it change at the end of
the notebook because that is not a streaming query. Instead it is a batch
query over data that has been written out by another stream.  I'd like to
add the ability to run a streaming query from data that has been written
out by the FileSink (tracked here SPARK-19633
).

In the demo, I started two different streaming queries:
 - one that reads from json / kafka => writes to parquet
 - one that reads from json / kafka => writes to memory sink

/ pushes latest answer to the js running in a browser using the
StreamingQueryListener
.
This is packaged up nicely in display(), but there is nothing stopping you
from building something similar with vanilla Apache Spark.

Michael


On Wed, Feb 15, 2017 at 11:34 AM, Sam Elamin 
wrote:

> Hey folks
>
> This one is mainly aimed at the databricks folks, I have been trying to
> replicate the cloudtrail demo
>  Micheal did at Spark
> Summit. The code for it can be found here
> 
>
> My question is how did you get the results to be displayed and updated
> continusly in real time
>
> I am also using databricks to duplicate it but I noticed the code link
> mentions
>
>  "If you count the number of rows in the table, you should find the value
> increasing over time. Run the following every few minutes."
> This leads me to believe that the version of Databricks that Micheal was
> using for the demo is still not released, or at-least the functionality to
> display those changes in real time aren't
>
> Is this the case? or am I completely wrong?
>
> Can I display the results of a structured streaming query in realtime
> using the databricks "display" function?
>
>
> Regards
> Sam
>


[jira] [Created] (SPARK-19633) FileSource read from FileSink

2017-02-16 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19633:


 Summary: FileSource read from FileSink
 Key: SPARK-19633
 URL: https://issues.apache.org/jira/browse/SPARK-19633
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust
Priority: Critical


Right now, you can't start a streaming query from a location that is being 
written to by the file sink.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Case class with POJO - encoder issues

2017-02-13 Thread Michael Armbrust
You are right, you need that PR.  I pinged the author, but otherwise it
would be great if someone could carry it over the finish line.

On Sat, Feb 11, 2017 at 4:19 PM, Jason White 
wrote:

> I'd like to create a Dataset using some classes from Geotools to do some
> geospatial analysis. In particular, I'm trying to use Spark to distribute
> the work based on ID and label fields that I extract from the polygon data.
>
> My simplified case class looks like this:
> implicit val geometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
> case class IndexedGeometry(label: String, tract: Geometry)
>
> When I try to create a dataset using this case class, it give me this error
> message:
> Exception in thread "main" java.lang.UnsupportedOperationException: No
> Encoder found for com.vividsolutions.jts.geom.Geometry
> - field (class: "com.vividsolutions.jts.geom.Geometry", name: "tract")
> - root class: "org.me.HelloWorld.IndexedGeometry"
>
> If I add another encoder for my case class...:
> implicit val indexedGeometryEncoder: Encoder[IndexedGeometry] =
> Encoders.kryo[IndexedGeometry]
>
> ...it works, but now the entire dataset has a single field, "value", and
> it's a binary blob.
>
> Is there a way to do what I'm trying to do?
> I believe this PR is related, but it's been idle since December:
> https://github.com/apache/spark/pull/15918
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[jira] [Commented] (SPARK-19553) Add GroupedData.countApprox()

2017-02-13 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15864326#comment-15864326
 ] 

Michael Armbrust commented on SPARK-19553:
--

It seems like there are a couple of distinct feature requests here:
 1) A different implementation of countApprox that that is based on scheduling 
timeouts instead of sketching to find distinct items.
 2) A short hand on GroupedData that could be used to call the aforementioned 
function.
 3) Better docs for the existing approx count distinct function. [Perhaps 
copied from 
here|https://github.com/apache/spark/blob/7a7ce272fe9a703f58b0180a9d2001ecb5c4b8db/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L235]

I'm skeptical of the utility of #1, but could be convinced with performance for 
real work tasks (you can call {{df.rdd.countApprox()}} today).  #3 seems like 
we should obviously do it.

> Add GroupedData.countApprox()
> -
>
> Key: SPARK-19553
> URL: https://issues.apache.org/jira/browse/SPARK-19553
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Nicholas Chammas
>Priority: Minor
>
> We already have a 
> [{{pyspark.sql.functions.approx_count_distinct()}}|http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.approx_count_distinct]
>  that can be applied to grouped data, but it seems odd that you can't just 
> get regular approximate count for grouped data.
> I imagine the API would mirror that for 
> [{{RDD.countApprox()}}|http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.countApprox],
>  but I'm not sure:
> {code}
> (df
> .groupBy('col1')
> .countApprox(timeout=300, confidence=0.95)
> .show())
> {code}
> Or, if we want to mirror the {{approx_count_distinct()}} function, we can do 
> that too. I'd want to understand why that function doesn't take a timeout or 
> confidence parameter, though. Also, what does {{rsd}} mean? It's not 
> documented.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-19477) [SQL] Datasets created from a Dataframe with extra columns retain the extra columns

2017-02-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-19477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15861918#comment-15861918
 ] 

Michael Armbrust commented on SPARK-19477:
--

If a lot of people are confused by this being lazy we can change it (didn't we 
already change it in 1.6 -> 2.0 in the other direction?).  It would have to be 
configurable though, since removing columns could be a breaking change.

> [SQL] Datasets created from a Dataframe with extra columns retain the extra 
> columns
> ---
>
> Key: SPARK-19477
> URL: https://issues.apache.org/jira/browse/SPARK-19477
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Don Drake
>
> In 1.6, when you created a Dataset from a Dataframe that had extra columns, 
> the columns not in the case class were dropped from the Dataset.
> For example in 1.6, the column c4 is gone:
> {code}
> scala> case class F(f1: String, f2: String, f3:String)
> defined class F
> scala> import sqlContext.implicits._
> import sqlContext.implicits._
> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i", 
> "j","z")).toDF("f1", "f2", "f3", "c4")
> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string, f3: string, c4: 
> string]
> scala> val ds = df.as[F]
> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string, f3: string]
> scala> ds.show
> +---+---+---+
> | f1| f2| f3|
> +---+---+---+
> |  a|  b|  c|
> |  d|  e|  f|
> |  h|  i|  j|
> {code}
> This seems to have changed in Spark 2.0 and also 2.1:
> Spark 2.1.0:
> {code}
> scala> case class F(f1: String, f2: String, f3:String)
> defined class F
> scala> import spark.implicits._
> import spark.implicits._
> scala> val df = Seq(("a","b","c","x"), ("d", "e", "f","y"), ("h", "i", 
> "j","z")).toDF("f1", "f2", "f3", "c4")
> df: org.apache.spark.sql.DataFrame = [f1: string, f2: string ... 2 more 
> fields]
> scala> val ds = df.as[F]
> ds: org.apache.spark.sql.Dataset[F] = [f1: string, f2: string ... 2 more 
> fields]
> scala> ds.show
> +---+---+---+---+
> | f1| f2| f3| c4|
> +---+---+---+---+
> |  a|  b|  c|  x|
> |  d|  e|  f|  y|
> |  h|  i|  j|  z|
> +---+---+---+---+
> scala> import org.apache.spark.sql.Encoders
> import org.apache.spark.sql.Encoders
> scala> val fEncoder = Encoders.product[F]
> fEncoder: org.apache.spark.sql.Encoder[F] = class[f1[0]: string, f2[0]: 
> string, f3[0]: string]
> scala> fEncoder.schema == ds.schema
> res2: Boolean = false
> scala> ds.schema
> res3: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true), StructField(f2,StringType,true), 
> StructField(f3,StringType,true), StructField(c4,StringType,true))
> scala> fEncoder.schema
> res4: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true), StructField(f2,StringType,true), 
> StructField(f3,StringType,true))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: benefits of code gen

2017-02-10 Thread Michael Armbrust
Function1 is specialized, but nullSafeEval is Any => Any, so that's still
going to box in the non-codegened execution path.

On Fri, Feb 10, 2017 at 1:32 PM, Koert Kuipers  wrote:

> based on that i take it that math functions would be primary beneficiaries
> since they work on primitives.
>
> so if i take UnaryMathExpression as an example, would i not get the same
> benefit if i change it to this?
>
> abstract class UnaryMathExpression(val f: Double => Double, name: String)
>   extends UnaryExpression with Serializable with ImplicitCastInputTypes {
>
>   override def inputTypes: Seq[AbstractDataType] = Seq(DoubleType)
>   override def dataType: DataType = DoubleType
>   override def nullable: Boolean = true
>   override def toString: String = s"$name($child)"
>   override def prettyName: String = name
>
>   protected override def nullSafeEval(input: Any): Any = {
> f(input.asInstanceOf[Double])
>   }
>
>   // name of function in java.lang.Math
>   def funcName: String = name.toLowerCase
>
>   def function(d: Double): Double = f(d)
>
>   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
> val self = ctx.addReferenceObj(name, this, getClass.getName)
> defineCodeGen(ctx, ev, c => s"$self.function($c)")
>   }
> }
>
> admittedly in this case the benefit in terms of removing complex codegen
> is not there (the codegen was only one line), but if i can remove codegen
> here i could also remove it in lots of other places where it does get very
> unwieldy simply by replacing it with calls to methods.
>
> Function1 is specialized, so i think (or hope) that my version does no
> extra boxes/unboxing.
>
> On Fri, Feb 10, 2017 at 2:24 PM, Reynold Xin  wrote:
>
>> With complex types it doesn't work as well, but for primitive types the
>> biggest benefit of whole stage codegen is that we don't even need to put
>> the intermediate data into rows or columns anymore. They are just variables
>> (stored in CPU registers).
>>
>> On Fri, Feb 10, 2017 at 8:22 PM, Koert Kuipers  wrote:
>>
>>> so i have been looking for a while now at all the catalyst expressions,
>>> and all the relative complex codegen going on.
>>>
>>> so first off i get the benefit of codegen to turn a bunch of chained
>>> iterators transformations into a single codegen stage for spark. that makes
>>> sense to me, because it avoids a bunch of overhead.
>>>
>>> but what i am not so sure about is what the benefit is of converting the
>>> actual stuff that happens inside the iterator transformations into codegen.
>>>
>>> say if we have an expression that has 2 children and creates a struct
>>> for them. why would this be faster in codegen by re-creating the code to do
>>> this in a string (which is complex and error prone) compared to simply have
>>> the codegen call the normal method for this in my class?
>>>
>>> i see so much trivial code be re-created in codegen. stuff like this:
>>>
>>>   private[this] def castToDateCode(
>>>   from: DataType,
>>>   ctx: CodegenContext): CastFunction = from match {
>>> case StringType =>
>>>   val intOpt = ctx.freshName("intOpt")
>>>   (c, evPrim, evNull) => s"""
>>> scala.Option $intOpt =
>>>   org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToDat
>>> e($c);
>>> if ($intOpt.isDefined()) {
>>>   $evPrim = ((Integer) $intOpt.get()).intValue();
>>> } else {
>>>   $evNull = true;
>>> }
>>>"""
>>>
>>> is this really faster than simply calling an equivalent functions from
>>> the codegen, and keeping the codegen logic restricted to the "unrolling" of
>>> chained iterators?
>>>
>>>
>>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Michael Armbrust
I think the fastest way is likely to use a combination of conditionals
(when / otherwise), first (ignoring nulls), while grouping by the id.  This
should get the answer with only a single shuffle.

Here is an example

.

On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski  wrote:

> Hi Everett,
>
> That's pretty much what I'd do. Can't think of a way to beat your
> solution. Why do you "feel vaguely uneasy about it"?
>
> I'd also check out the execution plan (with explain) to see how it's
> gonna work at runtime. I may have seen groupBy + join be better than
> window (there were more exchanges in play for windows I reckon).
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson 
> wrote:
> >
> >
> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski 
> wrote:
> >>
> >> Hi,
> >>
> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
> >> help here too.
> >
> >
> > This seems to work, but I do feel vaguely uneasy about it. :)
> >
> > // First add a 'rank' column which is priority order just in case
> priorities
> > aren't
> > // from 1 with no gaps.
> > val temp1 = data.withColumn("rank", functions.dense_rank()
> >.over(Window.partitionBy("id", "name").orderBy("priority")))
> >
> > +---++-+--+++
> > | id|name|extra|  data|priority|rank|
> > +---++-+--+++
> > |  1|Fred|8|value1|   1|   1|
> > |  1|Fred|8|value8|   2|   2|
> > |  1|Fred|8|value5|   3|   3|
> > |  2| Amy|9|value3|   1|   1|
> > |  2| Amy|9|value5|   2|   2|
> > +---++-+--+++
> >
> > // Now move all the columns we want to denormalize into a struct column
> to
> > keep them together.
> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
> > temp1("data"), temp1("priority")))
> >   .drop("extra", "data", "priority")
> >
> > +---++++
> > | id|name|rank| temp_struct|
> > +---++++
> > |  1|Fred|   1|[8,value1,1]|
> > |  1|Fred|   2|[8,value8,2]|
> > |  1|Fred|   3|[8,value5,3]|
> > |  2| Amy|   1|[9,value3,1]|
> > |  2| Amy|   2|[9,value5,2]|
> > +---++++
> >
> > // groupBy, again, but now pivot the rank column. We need an aggregate
> > function after pivot,
> > // so use first -- there will only ever be one element.
> > val temp3 = temp2.groupBy("id", "name")
> >   .pivot("rank", Seq("1", "2", "3"))
> >   .agg(functions.first("temp_struct"))
> >
> > +---+++++
> > | id|name|   1|   2|   3|
> > +---+++++
> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
> > |  2| Amy|[9,value3,1]|[9,value5,2]|null|
> > +---+++++
> >
> > // Now just moving things out of the structs and clean up.
> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
> >  .withColumn("data1", temp3("1").getField("data"))
> >  .withColumn("priority1", temp3("1").getField("priority"))
> >  .withColumn("extra2", temp3("2").getField("extra"))
> >  .withColumn("data2", temp3("2").getField("data"))
> >  .withColumn("priority2", temp3("2").getField("priority"))
> >  .withColumn("extra3", temp3("3").getField("extra"))
> >  .withColumn("data3", temp3("3").getField("data"))
> >  .withColumn("priority3", temp3("3").getField("priority"))
> >  .drop("1", "2", "3")
> >
> > +---++--+--+-+--+--+-+--
> +--+-+
> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
> > data3|priority3|
> > +---++--+--+-+--+--+-+--
> +--+-+
> > |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
> > 3|
> > |  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
> > null|
> > +---++--+--+-+--+--+-+--
> +--+-+
> >
> >
> >
> >
> >
> >
> >
> >>
> >>
> >> Jacek
> >>
> >> On 7 Feb 2017 8:02 p.m., "Everett Anderson" 
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm trying to un-explode or denormalize a table like
> >>>
> >>> +---++-+--++
> >>> |id |name|extra|data  |priority|
> >>> +---++-+--++
> >>> |1  |Fred|8|value1|1   |
> >>> |1  |Fred|8|value8|2   |
> >>> |1  |Fred|8|value5|3   |
> >>> |2  |Amy |9|value3|1   |
> >>> |2  |Amy |9|value5|2   |
> >>> +---++-+--++
> >>>
> >>> into something that looks like
> >>>
> >>>
> >>> +---++--+--+-+--+

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
The JSON log is only used by the file sink (which it doesn't seem like you
are using).  Though, I'm not sure exactly what is going on inside of
setupGoogle or how tableReferenceSource is used.

Typically you would run df.writeStream.option("path", "/my/path")... and
then the transaction log would go into /my/path/_spark_metadata.

There is not requirement that a sink uses this kind of a update log.  This
is just how we get better transactional semantics than HDFS is providing.
If your sink supports transactions natively you should just use those
instead.  We pass a unique ID to the sink method addBatch so that you can
make sure you don't commit the same transaction more than once.

On Tue, Feb 7, 2017 at 3:29 PM, Sam Elamin  wrote:

> Hi Micheal
>
> If thats the case for the below example, where should i be reading these
> json log files first? im assuming sometime between df and query?
>
>
> val df = spark
> .readStream
> .option("tableReferenceSource",tableName)
> .load()
> setUpGoogle(spark.sqlContext)
>
> val query = df
>   .writeStream
>   .option("tableReferenceSink",tableName2)
>   .option("checkpointLocation","checkpoint")
>   .start()
>
>
> On Tue, Feb 7, 2017 at 7:24 PM, Michael Armbrust 
> wrote:
>
>> Read the JSON log of files that is in `/your/path/_spark_metadata` and
>> only read files that are present in that log (ignore anything else).
>>
>> On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin 
>> wrote:
>>
>>> Ah I see ok so probably it's the retry that's causing it
>>>
>>> So when you say I'll have to take this into account, how do I best do
>>> that? My sink will have to know what was that extra file. And i was under
>>> the impression spark would automagically know this because of the
>>> checkpoint directory set when you created the writestream
>>>
>>> If that's not the case then how would I go about ensuring no duplicates?
>>>
>>>
>>> Thanks again for the awesome support!
>>>
>>> Regards
>>> Sam
>>> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
>>> wrote:
>>>
>>>> Sorry, I think I was a little unclear.  There are two things at play
>>>> here.
>>>>
>>>>  - Exactly-once semantics with file output: spark writes out extra
>>>> metadata on which files are valid to ensure that failures don't cause us to
>>>> "double count" any of the input.  Spark 2.0+ detects this info
>>>> automatically when you use dataframe reader (spark.read...). There may be
>>>> extra files, but they will be ignored. If you are consuming the output with
>>>> another system you'll have to take this into account.
>>>>  - Retries: right now we always retry the last batch when restarting.
>>>> This is safe/correct because of the above, but we could also optimize this
>>>> away by tracking more information about batch progress.
>>>>
>>>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 
>>>> wrote:
>>>>
>>>> Hmm ok I understand that but the job is running for a good few mins
>>>> before I kill it so there should not be any jobs left because I can see in
>>>> the log that its now polling for new changes, the latest offset is the
>>>> right one
>>>>
>>>> After I kill it and relaunch it picks up that same file?
>>>>
>>>>
>>>> Sorry if I misunderstood you
>>>>
>>>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>> It is always possible that there will be extra jobs from failed
>>>> batches. However, for the file sink, only one set of files will make it
>>>> into _spark_metadata directory log.  This is how we get atomic commits even
>>>> when there are files in more than one directory.  When reading the files
>>>> with Spark, we'll detect this directory and use it instead of listStatus to
>>>> find the list of valid files.
>>>>
>>>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
>>>> wrote:
>>>>
>>>> On another note, when it comes to checkpointing on structured streaming
>>>>
>>>> I noticed if I have  a stream running off s3 and I kill the process.
>>>> The next time the process starts running it dulplicates the last record
>>>> inserted. is that normal?
>>>>

Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Read the JSON log of files that is in `/your/path/_spark_metadata` and only
read files that are present in that log (ignore anything else).

On Tue, Feb 7, 2017 at 1:16 PM, Sam Elamin  wrote:

> Ah I see ok so probably it's the retry that's causing it
>
> So when you say I'll have to take this into account, how do I best do
> that? My sink will have to know what was that extra file. And i was under
> the impression spark would automagically know this because of the
> checkpoint directory set when you created the writestream
>
> If that's not the case then how would I go about ensuring no duplicates?
>
>
> Thanks again for the awesome support!
>
> Regards
> Sam
> On Tue, 7 Feb 2017 at 18:05, Michael Armbrust 
> wrote:
>
>> Sorry, I think I was a little unclear.  There are two things at play here.
>>
>>  - Exactly-once semantics with file output: spark writes out extra
>> metadata on which files are valid to ensure that failures don't cause us to
>> "double count" any of the input.  Spark 2.0+ detects this info
>> automatically when you use dataframe reader (spark.read...). There may be
>> extra files, but they will be ignored. If you are consuming the output with
>> another system you'll have to take this into account.
>>  - Retries: right now we always retry the last batch when restarting.
>> This is safe/correct because of the above, but we could also optimize this
>> away by tracking more information about batch progress.
>>
>> On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin 
>> wrote:
>>
>> Hmm ok I understand that but the job is running for a good few mins
>> before I kill it so there should not be any jobs left because I can see in
>> the log that its now polling for new changes, the latest offset is the
>> right one
>>
>> After I kill it and relaunch it picks up that same file?
>>
>>
>> Sorry if I misunderstood you
>>
>> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust 
>> wrote:
>>
>> It is always possible that there will be extra jobs from failed batches.
>> However, for the file sink, only one set of files will make it into
>> _spark_metadata directory log.  This is how we get atomic commits even when
>> there are files in more than one directory.  When reading the files with
>> Spark, we'll detect this directory and use it instead of listStatus to find
>> the list of valid files.
>>
>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
>> wrote:
>>
>> On another note, when it comes to checkpointing on structured streaming
>>
>> I noticed if I have  a stream running off s3 and I kill the process. The
>> next time the process starts running it dulplicates the last record
>> inserted. is that normal?
>>
>>
>>
>>
>> So say I have streaming enabled on one folder "test" which only has two
>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>> When I rerun the stream it picks up "update 2" again
>>
>> Is this normal? isnt ctrl+c a failure?
>>
>> I would expect checkpointing to know that update 2 was already processed
>>
>> Regards
>> Sam
>>
>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
>> wrote:
>>
>> Thanks Micheal!
>>
>>
>>
>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust 
>> wrote:
>>
>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>
>> We should add this soon.
>>
>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>> wrote:
>>
>> Hi All
>>
>> When trying to read a stream off S3 and I try and drop duplicates I get
>> the following error:
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> Append output mode not supported when there are streaming aggregations on
>> streaming DataFrames/DataSets;;
>>
>>
>> Whats strange if I use the batch "spark.read.json", it works
>>
>> Can I assume you cant drop duplicates in structured streaming
>>
>> Regards
>> Sam
>>
>>
>>
>>
>>
>>
>>
>>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Sorry, I think I was a little unclear.  There are two things at play here.

 - Exactly-once semantics with file output: spark writes out extra metadata
on which files are valid to ensure that failures don't cause us to "double
count" any of the input.  Spark 2.0+ detects this info automatically when
you use dataframe reader (spark.read...). There may be extra files, but
they will be ignored. If you are consuming the output with another system
you'll have to take this into account.
 - Retries: right now we always retry the last batch when restarting.  This
is safe/correct because of the above, but we could also optimize this away
by tracking more information about batch progress.

On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin  wrote:

> Hmm ok I understand that but the job is running for a good few mins before
> I kill it so there should not be any jobs left because I can see in the log
> that its now polling for new changes, the latest offset is the right one
>
> After I kill it and relaunch it picks up that same file?
>
>
> Sorry if I misunderstood you
>
> On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust 
> wrote:
>
>> It is always possible that there will be extra jobs from failed batches.
>> However, for the file sink, only one set of files will make it into
>> _spark_metadata directory log.  This is how we get atomic commits even when
>> there are files in more than one directory.  When reading the files with
>> Spark, we'll detect this directory and use it instead of listStatus to find
>> the list of valid files.
>>
>> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin 
>> wrote:
>>
>>> On another note, when it comes to checkpointing on structured streaming
>>>
>>> I noticed if I have  a stream running off s3 and I kill the process. The
>>> next time the process starts running it dulplicates the last record
>>> inserted. is that normal?
>>>
>>>
>>>
>>>
>>> So say I have streaming enabled on one folder "test" which only has two
>>> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
>>> When I rerun the stream it picks up "update 2" again
>>>
>>> Is this normal? isnt ctrl+c a failure?
>>>
>>> I would expect checkpointing to know that update 2 was already processed
>>>
>>> Regards
>>> Sam
>>>
>>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
>>> wrote:
>>>
>>>> Thanks Micheal!
>>>>
>>>>
>>>>
>>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>>>
>>>>> We should add this soon.
>>>>>
>>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>>>>> wrote:
>>>>>
>>>>>> Hi All
>>>>>>
>>>>>> When trying to read a stream off S3 and I try and drop duplicates I
>>>>>> get the following error:
>>>>>>
>>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>>>> Append output mode not supported when there are streaming aggregations on
>>>>>> streaming DataFrames/DataSets;;
>>>>>>
>>>>>>
>>>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>>>
>>>>>> Can I assume you cant drop duplicates in structured streaming
>>>>>>
>>>>>> Regards
>>>>>> Sam
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
It is always possible that there will be extra jobs from failed batches.
However, for the file sink, only one set of files will make it into
_spark_metadata directory log.  This is how we get atomic commits even when
there are files in more than one directory.  When reading the files with
Spark, we'll detect this directory and use it instead of listStatus to find
the list of valid files.

On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin  wrote:

> On another note, when it comes to checkpointing on structured streaming
>
> I noticed if I have  a stream running off s3 and I kill the process. The
> next time the process starts running it dulplicates the last record
> inserted. is that normal?
>
>
>
>
> So say I have streaming enabled on one folder "test" which only has two
> files "update1" and "update 2", then I kill the spark job using Ctrl+C.
> When I rerun the stream it picks up "update 2" again
>
> Is this normal? isnt ctrl+c a failure?
>
> I would expect checkpointing to know that update 2 was already processed
>
> Regards
> Sam
>
> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin 
> wrote:
>
>> Thanks Micheal!
>>
>>
>>
>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust 
>> wrote:
>>
>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497
>>>
>>> We should add this soon.
>>>
>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin 
>>> wrote:
>>>
>>>> Hi All
>>>>
>>>> When trying to read a stream off S3 and I try and drop duplicates I get
>>>> the following error:
>>>>
>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>>>> Append output mode not supported when there are streaming aggregations on
>>>> streaming DataFrames/DataSets;;
>>>>
>>>>
>>>> Whats strange if I use the batch "spark.read.json", it works
>>>>
>>>> Can I assume you cant drop duplicates in structured streaming
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>
>>>
>>
>


Re: Structured Streaming. Dropping Duplicates

2017-02-07 Thread Michael Armbrust
Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497

We should add this soon.

On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin  wrote:

> Hi All
>
> When trying to read a stream off S3 and I try and drop duplicates I get
> the following error:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Append
> output mode not supported when there are streaming aggregations on
> streaming DataFrames/DataSets;;
>
>
> Whats strange if I use the batch "spark.read.json", it works
>
> Can I assume you cant drop duplicates in structured streaming
>
> Regards
> Sam
>


[jira] [Created] (SPARK-19497) dropDuplicates with watermark

2017-02-07 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19497:


 Summary: dropDuplicates with watermark
 Key: SPARK-19497
 URL: https://issues.apache.org/jira/browse/SPARK-19497
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.1.0
Reporter: Michael Armbrust
Priority: Critical






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-19478) JDBC Sink

2017-02-06 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-19478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19478:
-
Issue Type: New Feature  (was: Bug)

> JDBC Sink
> -
>
> Key: SPARK-19478
> URL: https://issues.apache.org/jira/browse/SPARK-19478
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.0.0
>Reporter: Michael Armbrust
>
> A sink that transactionally commits data into a database use JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-19478) JDBC Sink

2017-02-06 Thread Michael Armbrust (JIRA)
Michael Armbrust created SPARK-19478:


 Summary: JDBC Sink
 Key: SPARK-19478
 URL: https://issues.apache.org/jira/browse/SPARK-19478
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.0.0
Reporter: Michael Armbrust


A sink that transactionally commits data into a database use JDBC.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
If you already have the expected schema, and you know that all numbers will
always be formatted as strings in the input JSON, you could probably derive
this list automatically.

Wouldn't it be simpler to just regex replace the numbers to remove the
> quotes?


I think this is likely to be a slower and less robust solution.  You would
have to make sure that you got all the corner cases right (i.e. escaping
and what not).

On Sun, Feb 5, 2017 at 3:13 PM, Sam Elamin  wrote:

> I see so for the connector I need to pass in an array/list of numerical
> columns?
>
> Wouldnt it be simpler to just regex replace the numbers to remove the
> quotes?
>
>
> Regards
> Sam
>
> On Sun, Feb 5, 2017 at 11:11 PM, Michael Armbrust 
> wrote:
>
>> Specifying the schema when parsing JSON will only let you pick between
>> similar datatypes (i.e should this be a short, long float, double etc).  It
>> will not let you perform conversions like string <-> number.  This has to
>> be done with explicit casts after the data has been loaded.
>>
>> I think you can make a solution that uses select or withColumn generic.
>> Just load the dataframe with a "parse schema" that treats numbers as
>> strings.  Then construct a list of columns that should be numbers and apply
>> the necessary conversions.
>>
>> import org.apache.spark.sql.functions.col
>> var df = spark.read.schema(parseSchema).json("...")
>> numericColumns.foreach { columnName =>
>>   df = df.withColumn(columnName, col(columnName).cast("long"))
>> }
>>
>>
>>
>> On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin 
>> wrote:
>>
>>> Thanks Micheal
>>>
>>> I've been spending the past few days researching this
>>>
>>> The problem is the generated json has double quotes on fields that are
>>> numbers because the producing datastore doesn't want to lose precision
>>>
>>> I can change the data type true but that would be on specific to a job
>>> rather than a generic streaming job. I'm writing a structured streaming
>>> connector and I have the schema the generated dataframe should match.
>>>
>>> Unfortunately using withColumn won't help me here since the solution
>>> needs to be generic
>>>
>>> To summarise assume I have the following json
>>>
>>> [{
>>> "customerid": "535137",
>>> "foo": "bar"
>>> }]
>>>
>>>
>>> and I know the schema should be:
>>> StructType(Array(StructField("customerid",LongType,true),Str
>>> uctField("foo",StringType,true)))
>>>
>>> Whats the best way of solving this?
>>>
>>> My current approach is to iterate over the JSON and identify which
>>> fields are numbers and which arent then recreate the json
>>>
>>> But to be honest that doesnt seem like the cleanest approach, so happy
>>> for advice on this
>>>
>>> Regards
>>> Sam
>>>
>>> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust 
>>> wrote:
>>>
>>>> -dev
>>>>
>>>> You can use withColumn to change the type after the data has been
>>>> loaded
>>>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>>>> .
>>>>
>>>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin 
>>>> wrote:
>>>>
>>>> Hi Direceu
>>>>
>>>> Thanks your right! that did work
>>>>
>>>>
>>>> But now im facing an even bigger problem since i dont have access to
>>>> change the underlying data, I just want to apply a schema over something
>>>> that was written via the sparkContext.newAPIHadoopRDD
>>>>
>>>> Basically I am reading in a RDD[JsonObject] and would like to convert
>>>> it into a dataframe which I pass the schema into
>>>>
>>>> Whats the best way to do this?
>>>>
>>>> I doubt removing all the quotes in the JSON is the best solution is it?
>>>>
>>>> Regards
>>>> Sam
>>>>
>>>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>>>> dirceu.semigh...@gmail.com> wrote:
>>>>
>>>> Hi Sam
>>>> Remove the " from the number that it will work
>>>>
>>>> Em 4 de fev de 2017 11:46 

Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
Specifying the schema when parsing JSON will only let you pick between
similar datatypes (i.e should this be a short, long float, double etc).  It
will not let you perform conversions like string <-> number.  This has to
be done with explicit casts after the data has been loaded.

I think you can make a solution that uses select or withColumn generic.
Just load the dataframe with a "parse schema" that treats numbers as
strings.  Then construct a list of columns that should be numbers and apply
the necessary conversions.

import org.apache.spark.sql.functions.col
var df = spark.read.schema(parseSchema).json("...")
numericColumns.foreach { columnName =>
  df = df.withColumn(columnName, col(columnName).cast("long"))
}



On Sun, Feb 5, 2017 at 2:09 PM, Sam Elamin  wrote:

> Thanks Micheal
>
> I've been spending the past few days researching this
>
> The problem is the generated json has double quotes on fields that are
> numbers because the producing datastore doesn't want to lose precision
>
> I can change the data type true but that would be on specific to a job
> rather than a generic streaming job. I'm writing a structured streaming
> connector and I have the schema the generated dataframe should match.
>
> Unfortunately using withColumn won't help me here since the solution needs
> to be generic
>
> To summarise assume I have the following json
>
> [{
> "customerid": "535137",
> "foo": "bar"
> }]
>
>
> and I know the schema should be:
> StructType(Array(StructField("customerid",LongType,true),
> StructField("foo",StringType,true)))
>
> Whats the best way of solving this?
>
> My current approach is to iterate over the JSON and identify which fields
> are numbers and which arent then recreate the json
>
> But to be honest that doesnt seem like the cleanest approach, so happy for
> advice on this
>
> Regards
> Sam
>
> On Sun, 5 Feb 2017 at 22:00, Michael Armbrust 
> wrote:
>
>> -dev
>>
>> You can use withColumn to change the type after the data has been loaded
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/1572067047091340/2840265927289860/latest.html>
>> .
>>
>> On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin 
>> wrote:
>>
>> Hi Direceu
>>
>> Thanks your right! that did work
>>
>>
>> But now im facing an even bigger problem since i dont have access to
>> change the underlying data, I just want to apply a schema over something
>> that was written via the sparkContext.newAPIHadoopRDD
>>
>> Basically I am reading in a RDD[JsonObject] and would like to convert it
>> into a dataframe which I pass the schema into
>>
>> Whats the best way to do this?
>>
>> I doubt removing all the quotes in the JSON is the best solution is it?
>>
>> Regards
>> Sam
>>
>> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
>> dirceu.semigh...@gmail.com> wrote:
>>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" 
>> escreveu:
>>
>> Hi All
>>
>> I would like to specify a schema when reading from a json but when trying
>> to map a number to a Double it fails, I tried FloatType and IntType with no
>> joy!
>>
>>
>> When inferring the schema customer id is set to String, and I would like
>> to cast it as Double
>>
>> so df1 is corrupted while df2 shows
>>
>>
>> Also FYI I need this to be generic as I would like to apply it to any
>> json, I specified the below schema as an example of the issue I am facing
>>
>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>> DoubleType,FloatType, StructType, LongType,DecimalType}
>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>> val df1 = 
>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> val df2 = 
>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>> df1.show(1)
>> df2.show(1)
>>
>>
>> Any help would be appreciated, I am sure I am missing something obvious
>> but for the life of me I cant tell what it is!
>>
>>
>> Kind Regards
>> Sam
>>
>>
>>
>>


Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
-dev

You can use withColumn to change the type after the data has been loaded

.

On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin  wrote:

> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" 
>> escreveu:
>>
>>> Hi All
>>>
>>> I would like to specify a schema when reading from a json but when
>>> trying to map a number to a Double it fails, I tried FloatType and IntType
>>> with no joy!
>>>
>>>
>>> When inferring the schema customer id is set to String, and I would like
>>> to cast it as Double
>>>
>>> so df1 is corrupted while df2 shows
>>>
>>>
>>> Also FYI I need this to be generic as I would like to apply it to any
>>> json, I specified the below schema as an example of the issue I am facing
>>>
>>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>>> DoubleType,FloatType, StructType, LongType,DecimalType}
>>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>>> val df1 = 
>>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> val df2 = 
>>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> df1.show(1)
>>> df2.show(1)
>>>
>>>
>>> Any help would be appreciated, I am sure I am missing something obvious
>>> but for the life of me I cant tell what it is!
>>>
>>>
>>> Kind Regards
>>> Sam
>>>
>>
>


Re: specifing schema on dataframe

2017-02-05 Thread Michael Armbrust
-dev

You can use withColumn to change the type after the data has been loaded

.

On Sat, Feb 4, 2017 at 6:22 AM, Sam Elamin  wrote:

> Hi Direceu
>
> Thanks your right! that did work
>
>
> But now im facing an even bigger problem since i dont have access to
> change the underlying data, I just want to apply a schema over something
> that was written via the sparkContext.newAPIHadoopRDD
>
> Basically I am reading in a RDD[JsonObject] and would like to convert it
> into a dataframe which I pass the schema into
>
> Whats the best way to do this?
>
> I doubt removing all the quotes in the JSON is the best solution is it?
>
> Regards
> Sam
>
> On Sat, Feb 4, 2017 at 2:13 PM, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com> wrote:
>
>> Hi Sam
>> Remove the " from the number that it will work
>>
>> Em 4 de fev de 2017 11:46 AM, "Sam Elamin" 
>> escreveu:
>>
>>> Hi All
>>>
>>> I would like to specify a schema when reading from a json but when
>>> trying to map a number to a Double it fails, I tried FloatType and IntType
>>> with no joy!
>>>
>>>
>>> When inferring the schema customer id is set to String, and I would like
>>> to cast it as Double
>>>
>>> so df1 is corrupted while df2 shows
>>>
>>>
>>> Also FYI I need this to be generic as I would like to apply it to any
>>> json, I specified the below schema as an example of the issue I am facing
>>>
>>> import org.apache.spark.sql.types.{BinaryType, StringType, StructField, 
>>> DoubleType,FloatType, StructType, LongType,DecimalType}
>>> val testSchema = StructType(Array(StructField("customerid",DoubleType)))
>>> val df1 = 
>>> spark.read.schema(testSchema).json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> val df2 = 
>>> spark.read.json(sc.parallelize(Array("""{"customerid":"535137"}""")))
>>> df1.show(1)
>>> df2.show(1)
>>>
>>>
>>> Any help would be appreciated, I am sure I am missing something obvious
>>> but for the life of me I cant tell what it is!
>>>
>>>
>>> Kind Regards
>>> Sam
>>>
>>
>


Re: frustration with field names in Dataset

2017-02-02 Thread Michael Armbrust
That might be reasonable.  At least I can't think of any problems with
doing that.

On Thu, Feb 2, 2017 at 7:39 AM, Koert Kuipers  wrote:

> since a dataset is a typed object you ideally don't have to think about
> field names.
>
> however there are operations on Dataset that require you to provide a
> Column, like for example joinWith (and joinWith returns a strongly typed
> Dataset, not DataFrame). once you have to provide a Column you are back to
> thinking in field names, and worrying about duplicate field names, which is
> something that can easily happen in a Dataset without you realizing it.
>
> so under the hood Dataset has unique identifiers for every column, as in
> dataset.queryExecution.logical.output, but these are expressions
> (attributes) that i cannot turn back into columns since the constructors
> for this are private in spark.
>
> so how about having Dataset.apply(i: Int): Column to allow me to pick
> columns by position without having to worry about (duplicate) field names?
> then i could do something like:
>
> dataset.joinWith(otherDataset, dataset(0) === otherDataset(0), joinType)
>


Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
This is the error, you are missing an import:

:13: error: not found: type Encoder
   abstract class RawTable[A : Encoder](inDir: String) {

Works for me in a REPL.
<https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/204687029790319/2840265927289860/latest.html>

On Wed, Feb 1, 2017 at 3:34 PM, Don Drake  wrote:

> Thanks for the reply.   I did give that syntax a try [A : Encoder]
> yesterday, but I kept getting this exception in a spark-shell and Zeppelin
> browser.
>
> scala> import org.apache.spark.sql.Encoder
> import org.apache.spark.sql.Encoder
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> abstract class RawTable[A : Encoder](inDir: String) {
>  | import spark.implicits._
>  | def load() = {
>  | import spark.implicits._
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :13: error: not found: type Encoder
>abstract class RawTable[A : Encoder](inDir: String) {
>^
> :24: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
>
> I gave it a try today in a Scala application and it seems to work.  Is
> this a known issue in a spark-shell?
>
> In my Scala application, this is being defined in a separate file, etc.
> without direct access to a Spark session.
>
> I had to add the following code snippet so the import spark.implicits._
> would take effect:
>
> // ugly hack to get around Encoder can't be found compile time errors
>
> private object myImplicits extends SQLImplicits {
>
>   protected override def _sqlContext: SQLContext = MySparkSingleton.
> getCurrentSession().sqlContext
>
> }
>
> import myImplicits._
>
> I found that in about the hundredth SO post I searched for this problem.
> Is this the best way to let implicits do its thing?
>
> Thanks.
>
> -Don
>
>
>
> On Wed, Feb 1, 2017 at 3:16 PM, Michael Armbrust 
> wrote:
>
>> You need to enforce that an Encoder is available for the type A using a 
>> context
>> bound <http://docs.scala-lang.org/tutorials/FAQ/context-bounds>.
>>
>> import org.apache.spark.sql.Encoder
>> abstract class RawTable[A : Encoder](inDir: String) {
>>   ...
>> }
>>
>> On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:
>>
>>> I have a set of CSV that I need to perform ETL on, with the plan to
>>> re-use a lot of code between each file in a parent abstract class.
>>>
>>> I tried creating the following simple abstract class that will have a
>>> parameterized type of a case class that represents the schema being read in.
>>>
>>> This won't compile, it just complains about not being able to find an
>>> encoder, but I'm importing the implicits and don't believe this error.
>>>
>>>
>>> scala> import spark.implicits._
>>> import spark.implicits._
>>>
>>> scala>
>>>
>>> scala> case class RawTemp(f1: String, f2: String, temp: Long,
>>> created_at: java.sql.Timestamp, data_filename: String)
>>> defined class RawTemp
>>>
>>> scala>
>>>
>>> scala> abstract class RawTable[A](inDir: String) {
>>>  | def load() = {
>>>  | spark.read
>>>  | .option("header", "true")
>>>  | .option("mode", "FAILFAST")
>>>  | .option("escape", "\"")
>>>  | .option("nullValue", "")
>>>  | .option("indferSchema", "true")
>>>  | .csv(inDir)
>>>  | .as[A]
>>>

Re: Parameterized types and Datasets - Spark 2.1.0

2017-02-01 Thread Michael Armbrust
You need to enforce that an Encoder is available for the type A using a context
bound .

import org.apache.spark.sql.Encoder
abstract class RawTable[A : Encoder](inDir: String) {
  ...
}

On Tue, Jan 31, 2017 at 8:12 PM, Don Drake  wrote:

> I have a set of CSV that I need to perform ETL on, with the plan to re-use
> a lot of code between each file in a parent abstract class.
>
> I tried creating the following simple abstract class that will have a
> parameterized type of a case class that represents the schema being read in.
>
> This won't compile, it just complains about not being able to find an
> encoder, but I'm importing the implicits and don't believe this error.
>
>
> scala> import spark.implicits._
> import spark.implicits._
>
> scala>
>
> scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
> java.sql.Timestamp, data_filename: String)
> defined class RawTemp
>
> scala>
>
> scala> abstract class RawTable[A](inDir: String) {
>  | def load() = {
>  | spark.read
>  | .option("header", "true")
>  | .option("mode", "FAILFAST")
>  | .option("escape", "\"")
>  | .option("nullValue", "")
>  | .option("indferSchema", "true")
>  | .csv(inDir)
>  | .as[A]
>  | }
>  | }
> :27: error: Unable to find encoder for type stored in a Dataset.
> Primitive types (Int, String, etc) and Product types (case classes) are
> supported by importing spark.implicits._  Support for serializing other
> types will be added in future releases.
>.as[A]
>
> scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
> :13: error: not found: type RawTable
>class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
>   ^
>
> What's odd is that this output looks okay:
>
> scala> val RTEncoder = Encoders.product[RawTemp]
> RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
> f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
> string]
>
> scala> RTEncoder.schema
> res4: org.apache.spark.sql.types.StructType = 
> StructType(StructField(f1,StringType,true),
> StructField(f2,StringType,true), StructField(temp,LongType,false),
> StructField(created_at,TimestampType,true), StructField(data_filename,
> StringType,true))
>
> scala> RTEncoder.clsTag
> res5: scala.reflect.ClassTag[RawTemp] = RawTemp
>
> Any ideas?
>
> --
> Donald Drake
> Drake Consulting
> http://www.drakeconsulting.com/
> https://twitter.com/dondrake 
> 800-733-2143 <(800)%20733-2143>
>


Re: using withWatermark on Dataset

2017-02-01 Thread Michael Armbrust
Can you give the full stack trace?  Also which version of Spark are you
running?

On Wed, Feb 1, 2017 at 10:38 AM, Jerry Lam  wrote:

> Hi everyone,
>
> Anyone knows how to use withWatermark  on Dataset?
>
> I have tried the following but hit this exception:
>
> dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
> cannot be cast to "MyType"
>
> The code looks like the following:
>
> dataset
> .withWatermark("timestamp", "5 seconds")
> .groupBy("timestamp", "customer_id")
> .agg(MyAggregator)
> .writeStream
>
> Where dataset has MyType for each row.
> Where MyType is:
> case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)
>
> MyAggregator which takes MyType as the input type did some maths on the
> product_id and outputs a set of product_ids.
>
> Best Regards,
>
> Jerry
>
>
>
>
>
>
>


[jira] (SPARK-16454) Consider adding a per-batch transform for structured streaming

2017-01-30 Thread Michael Armbrust (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Michael Armbrust commented on  SPARK-16454 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
  Re: Consider adding a per-batch transform for structured streaming  
 
 
 
 
 
 
 
 
 
 
So are you looking for def transform(func: RDD[T] => RDD[T]): DataFrame? 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



Re: kafka structured streaming source refuses to read

2017-01-30 Thread Michael Armbrust
Thanks for for following up!  I've linked the relevant tickets to
SPARK-18057  and I
targeted it for Spark 2.2.

On Sat, Jan 28, 2017 at 10:15 AM, Koert Kuipers  wrote:

> there was also already an existing spark ticket for this:
> SPARK-18779 
>
> On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers  wrote:
>
>> it seems the bug is:
>> https://issues.apache.org/jira/browse/KAFKA-4547
>>
>> i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or
>> 0.10.1.1
>>
>> On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers  wrote:
>>
>>> in case anyone else runs into this:
>>>
>>> the issue is that i was using kafka-clients 0.10.1.1
>>>
>>> it works when i use kafka-clients 0.10.0.1 with spark structured
>>> streaming
>>>
>>> my kafka server is 0.10.1.1
>>>
>>> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers 
>>> wrote:
>>>
 i checked my topic. it has 5 partitions but all the data is written to
 a single partition: wikipedia-2
 i turned on debug logging and i see this:

 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
 consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
 wikipedia-1]. Seeking to the end.
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-0
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-4
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-3
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-2
 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
 partition wikipedia-1
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-0 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-0 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-4 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-4
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-4 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-4
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-3 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
 successful heartbeat response for group spark-kafka-source-fac4f749-fd
 56-4a32-82c7-e687aadf520b-1923704552-driver-0
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-3
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-3 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-3
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-2 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=152908} for partition wikipedia-2
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-2 to earliest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-2
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
 partition wikipedia-1 to latest offset.
 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
 offset=0} for partition wikipedia-1
 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
 partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
 wikipedia-3 -> 0, wikipedia-0 -> 0)

 what is confusing to me is this:
 Resetting offset for partition wikipedia-2 to latest offset.
 Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
 Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)

 why does it find latest offset 152908 for wikipedia-2 but then sets
 latest offset to 0 for that partition? or am i misunderstanding?

 On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers 
 wrote:

> code:
>   val query = spark.readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "somenode:9092")
> .option("subscribe", "wikipedia")
>   

[jira] (SPARK-18057) Update structured streaming kafka from 10.0.1 to 10.1.0

2017-01-30 Thread Michael Armbrust (JIRA)
Title: Message Title
 
 
 
 
 
 
 
 
 
 
  
 
 Michael Armbrust updated an issue 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 Spark /  SPARK-18057 
 
 
 
  Update structured streaming kafka from 10.0.1 to 10.1.0  
 
 
 
 
 
 
 
 
 

Change By:
 
 Michael Armbrust 
 
 
 

Target Version/s:
 
 2.2.0 
 
 
 
 
 
 
 
 
 
 
 
 

 
 Add Comment 
 
 
 
 
 
 
 
 
 
 

 
 
 
 
 
 
 
 
 
 

 This message was sent by Atlassian JIRA (v6.3.15#6346-sha1:dbc023d) 
 
 
 
 
  
 
 
 
 
 
 
 
 
   



Re: kafka structured streaming source refuses to read

2017-01-27 Thread Michael Armbrust
Yeah, kafka server client compatibility can be pretty confusing and does
not give good errors in the case of mismatches.  This should be addressed
in the next release of kafka (they are adding an API to query the servers
capabilities).

On Fri, Jan 27, 2017 at 12:56 PM, Koert Kuipers  wrote:

> in case anyone else runs into this:
>
> the issue is that i was using kafka-clients 0.10.1.1
>
> it works when i use kafka-clients 0.10.0.1 with spark structured streaming
>
> my kafka server is 0.10.1.1
>
> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers  wrote:
>
>> i checked my topic. it has 5 partitions but all the data is written to a
>> single partition: wikipedia-2
>> i turned on debug logging and i see this:
>>
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to
>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2,
>> wikipedia-1]. Seeking to the end.
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of
>> partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-0 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-4 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-4
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received
>> successful heartbeat response for group spark-kafka-source-fac4f749-fd
>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-3 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-3
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=152908} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-2 to earliest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-2
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for
>> partition wikipedia-1 to latest offset.
>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1,
>> offset=0} for partition wikipedia-1
>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for
>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0,
>> wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> what is confusing to me is this:
>> Resetting offset for partition wikipedia-2 to latest offset.
>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2
>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 ->
>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0)
>>
>> why does it find latest offset 152908 for wikipedia-2 but then sets
>> latest offset to 0 for that partition? or am i misunderstanding?
>>
>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers  wrote:
>>
>>> code:
>>>   val query = spark.readStream
>>> .format("kafka")
>>> .option("kafka.bootstrap.servers", "somenode:9092")
>>> .option("subscribe", "wikipedia")
>>> .load
>>> .select(col("value") cast StringType)
>>> .writeStream
>>> .format("console")
>>> .outputMode(OutputMode.Append)
>>> .start()
>>>
>>>   while (true) {
>>> Thread.sleep(1)
>>> println(query.lastProgress)
>>>   }
>>> }
>>>
>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman <
>>> alons...@gmail.com> wrote:
>>>
 lets see the code...

 Alonso Isidoro Roman
 [image: https://]about.me/alonso.isidoro.roman

 

Re: printSchema showing incorrect datatype?

2017-01-25 Thread Michael Armbrust
Encoders are just an object based view on a Dataset.  Until you actually
materialize and object, they are not used and thus will not change the
schema of the dataframe.

On Tue, Jan 24, 2017 at 8:28 AM, Koert Kuipers  wrote:

> scala> val x = Seq("a", "b").toDF("x")
> x: org.apache.spark.sql.DataFrame = [x: string]
>
> scala> x.as[Array[Byte]].printSchema
> root
>  |-- x: string (nullable = true)
>
> scala> x.as[Array[Byte]].map(x => x).printSchema
> root
>  |-- value: binary (nullable = true)
>
> why does the first schema show string instead of binary?
>


Re: Setting startingOffsets to earliest in structured streaming never catches up

2017-01-23 Thread Michael Armbrust
+1 to Ryan's suggestion of setting maxOffsetsPerTrigger.  This way you can
at least see how quickly it is making progress towards catching up.

On Sun, Jan 22, 2017 at 7:02 PM, Timothy Chan  wrote:

> I'm using version 2.02.
>
> The difference I see between using latest and earliest is a series of jobs
> that take less than a second vs. one job that goes on for over 24 hours.
>
> On Sun, Jan 22, 2017 at 6:54 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Which Spark version are you using? If you are using 2.1.0, could you use
>> the monitoring APIs (http://spark.apache.org/docs/
>> latest/structured-streaming-programming-guide.html#
>> monitoring-streaming-queries) to check the input rate and the processing
>> rate? One possible issue is that the Kafka source launched a pretty large
>> batch and it took too long to finish it. If so, you can use
>> "maxOffsetsPerTrigger" option to limit the data size in a batch in order to
>> observe the progress.
>>
>> On Sun, Jan 22, 2017 at 10:22 AM, Timothy Chan 
>> wrote:
>>
>> I'm running my structured streaming jobs in EMR. We were thinking a worst
>> case scenario recovery situation would be to spin up another cluster and
>> set startingOffsets to earliest (our Kafka cluster has a retention policy
>> of 7 days).
>>
>> My observation is that the job never catches up to latest. This is not
>> acceptable. I've set the number of partitions for the topic to 6. I've
>> tried using a cluster of 4 in EMR.
>>
>> The producer rate for this topic is 4 events/second. Does anyone have any
>> suggestions on what I can do to have my consumer catch up to latest faster?
>>
>>
>>


Re: [SQL][SPARK-14160] Maximum interval for o.a.s.sql.functions.window

2017-01-18 Thread Michael Armbrust
+1, we should just fix the error to explain why months aren't allowed and
suggest that you manually specify some number of days.

On Wed, Jan 18, 2017 at 9:52 AM, Maciej Szymkiewicz 
wrote:

> Thanks for the response Burak,
>
> As any sane person I try to steer away from the objects which have both
> calendar and unsafe in their fully qualified names but if there is no
> bigger picture I missed here I would go with 1 as well. And of course fix
> the error message. I understand this has been introduced with structured
> streaming in mind, but it is an useful feature in general, not only in high
> precision scale. To be honest I would love to see some generalized version
> which could be used (I mean without hacking) with arbitrary numeric
> sequence. It could address at least some scenarios in which people try to
> use window functions without PARTITION BY clause and fail miserably.
>
> Regarding ambiguity... Sticking with days doesn't really resolve the
> problem, does it? If one were to nitpick it doesn't look like this
> implementation even touches all the subtleties of DST or leap second.
>
>
>
>
> On 01/18/2017 05:52 PM, Burak Yavuz wrote:
>
> Hi Maciej,
>
> I believe it would be useful to either fix the documentation or fix the
> implementation. I'll leave it to the community to comment on. The code
> right now disallows intervals provided in months and years, because they
> are not a "consistently" fixed amount of time. A month can be 28, 29, 30,
> or 31 days. A year is 12 months for sure, but is it 360 days (sometimes
> used in finance), 365 days or 366 days?
>
> Therefore we could either:
>   1) Allow windowing when intervals are given in days and less, even
> though it could be 365 days, and fix the documentation.
>   2) Explicitly disallow it as there may be a lot of data for a given
> window, but partial aggregations should help with that.
>
> My thoughts are to go with 1. What do you think?
>
> Best,
> Burak
>
> On Wed, Jan 18, 2017 at 10:18 AM, Maciej Szymkiewicz <
> mszymkiew...@gmail.com> wrote:
>
>> Hi,
>>
>> Can I ask for some clarifications regarding intended behavior of window /
>> TimeWindow?
>>
>> PySpark documentation states that "Windows in the order of months are not
>> supported". This is further confirmed by the checks in
>> TimeWindow.getIntervalInMicroseconds (https://git.io/vMP5l).
>>
>> Surprisingly enough we can pass interval much larger than a month by
>> expressing interval in days or another unit of a higher precision. So this
>> fails:
>>
>> Seq("2017-01-01").toDF("date").groupBy(window($"date", "1 month"))
>>
>> while following is accepted:
>>
>> Seq("2017-01-01").toDF("date").groupBy(window($"date", "999 days"))
>>
>> with results which look sensible at first glance.
>>
>> Is it a matter of a faulty validation logic (months will be assigned only
>> if there is a match against years or months https://git.io/vMPdi) or
>> expected behavior and I simply misunderstood the intentions?
>>
>> --
>> Best,
>> Maciej
>>
>>
>
>


[jira] [Updated] (SPARK-18682) Batch Source for Kafka

2017-01-13 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-18682:
-
Assignee: Tyson Condie

> Batch Source for Kafka
> --
>
> Key: SPARK-18682
> URL: https://issues.apache.org/jira/browse/SPARK-18682
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL, Structured Streaming
>    Reporter: Michael Armbrust
>Assignee: Tyson Condie
>
> Today, you can start a stream that reads from kafka.  However, given kafka's 
> configurable retention period, it seems like sometimes you might just want to 
> read all of the data that is available now.  As such we should add a version 
> that works with {{spark.read}} as well.
> The options should be the same as the streaming kafka source, with the 
> following differences:
>  - {{startingOffsets}} should default to earliest, and should not allow 
> {{latest}} (which would always be empty).
>  - {{endingOffsets}} should also be allowed and should default to {{latest}}. 
> the same assign json format as {{startingOffsets}} should also be accepted.
> It would be really good, if things like {{.limit\(n\)}} were enough to 
> prevent all the data from being read (this might just work).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-18475) Be able to provide higher parallelization for StructuredStreaming Kafka Source

2017-01-13 Thread Michael Armbrust (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust closed SPARK-18475.

Resolution: Won't Fix

> Be able to provide higher parallelization for StructuredStreaming Kafka Source
> --
>
> Key: SPARK-18475
> URL: https://issues.apache.org/jira/browse/SPARK-18475
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.2, 2.1.0
>Reporter: Burak Yavuz
>
> Right now the StructuredStreaming Kafka Source creates as many Spark tasks as 
> there are TopicPartitions that we're going to read from Kafka.
> This doesn't work well when we have data skew, and there is no reason why we 
> shouldn't be able to increase parallelism further, i.e. have multiple Spark 
> tasks reading from the same Kafka TopicPartition.
> What this will mean is that we won't be able to use the "CachedKafkaConsumer" 
> for what it is defined for (being cached) in this use case, but the extra 
> overhead is worth handling data skew and increasing parallelism especially in 
> ETL use cases.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >