Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Why writeStream is needed to consume the data ?

When I tried it I got this exception:

INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
> org.apache.spark.sql.AnalysisException: Complete output mode not supported
> when there are no streaming aggregations on streaming DataFrames/Datasets;
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:65)
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:236)
> at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:287)
> at .(:59)




2016-08-01 18:44 GMT+02:00 Amit Sela :

> I think you're missing:
>
> val query = wordCounts.writeStream
>
>   .outputMode("complete")
>   .format("console")
>   .start()
>
> Dis it help ?
>
> On Mon, Aug 1, 2016 at 2:44 PM Jacek Laskowski  wrote:
>
>> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
>>  wrote:
>>
>> > the problem now is that when I consume the dataframe for example with
>> count
>> > I get the stack trace below.
>>
>> Mind sharing the entire pipeline?
>>
>> > I followed the implementation of TextSocketSourceProvider to implement
>> my
>> > data source and Text Socket source is used in the official documentation
>> > here.
>>
>> Right. Completely forgot about the provider. Thanks for reminding me
>> about it!
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: spark 2.0 readStream from a REST API

2016-08-02 Thread Ayoub Benali
Hello,

here is the code I am trying to run:


https://gist.github.com/ayoub-benali/a96163c711b4fce1bdddf16b911475f2

Thanks,
Ayoub.

2016-08-01 13:44 GMT+02:00 Jacek Laskowski :

> On Mon, Aug 1, 2016 at 11:01 AM, Ayoub Benali
>  wrote:
>
> > the problem now is that when I consume the dataframe for example with
> count
> > I get the stack trace below.
>
> Mind sharing the entire pipeline?
>
> > I followed the implementation of TextSocketSourceProvider to implement my
> > data source and Text Socket source is used in the official documentation
> > here.
>
> Right. Completely forgot about the provider. Thanks for reminding me about
> it!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


Re: spark 2.0 readStream from a REST API

2016-08-01 Thread Ayoub Benali
Hello,

using the full class name worked, thanks.

the problem now is that when I consume the dataframe for example with count
I get the stack trace below.

I followed the implementation of TextSocketSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala#L128>
to
implement my data source and Text Socket source is used in the official
documentation here
<https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example>
.

Why does count works in the example documentation? is there some other
trait that need to be implemented ?

Thanks,
Ayoub.


org.apache.spark.sql.AnalysisException: Queries with streaming sources must
> be executed with writeStream.start();
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
> at
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31)
> at
> org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70)
> at
> org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
> at
> org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
> at
> org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
> at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)







2016-07-31 21:56 GMT+02:00 Michael Armbrust :

> You have to add a file in resource too (example
> <https://github.com/apache/spark/blob/master/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister>).
> Either that or give a full class name.
>
> On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali  > wrote:
>
>> Looks like the way to go in spark 2.0 is to implement
>> StreamSourceProvider
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
>>  with DataSourceRegister
>> <https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
>> But now spark fails at loading the class when doing:
>>
>> spark.readStream.format("mysource").load()
>>
>> I get :
>>
>> java.lang.ClassNotFoundException: Failed to find data source: mysource.
>> Please find packages at http://spark-packages.org
>>
>> Is there something I need to do in order to "load" the Stream source
>> provider ?
>>
>> Thanks,
>> Ayoub
>>
>> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski :
>>
>>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>>>  wrote:
>>>
>>> > I started playing with the Structured Streaming API in spark 2.0 and I
>>> am
>>> > looking for a way to create streaming Dataset/Dataframe from a rest
>>> HTTP
>>> > endpoint but I am bit stuck.
>>>
>>> What a great idea! Why did I myself not think about this?!?!
>>>
>>> > What would be the easiest way to hack around it ? Do I need to
>>> implement the
>>> > Datasource API ?
>>>
>>> Yes and perhaps Hadoop API too, bu

Re: spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Looks like the way to go in spark 2.0 is to implement StreamSourceProvider
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L117>
 with DataSourceRegister
<https://github.com/apache/spark/blob/9e2c763dbb5ac6fc5d2eb0759402504d4b9073a4/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L40>.
But now spark fails at loading the class when doing:

spark.readStream.format("mysource").load()

I get :

java.lang.ClassNotFoundException: Failed to find data source: mysource.
Please find packages at http://spark-packages.org

Is there something I need to do in order to "load" the Stream source
provider ?

Thanks,
Ayoub

2016-07-31 17:19 GMT+02:00 Jacek Laskowski :

> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali
>  wrote:
>
> > I started playing with the Structured Streaming API in spark 2.0 and I am
> > looking for a way to create streaming Dataset/Dataframe from a rest HTTP
> > endpoint but I am bit stuck.
>
> What a great idea! Why did I myself not think about this?!?!
>
> > What would be the easiest way to hack around it ? Do I need to implement
> the
> > Datasource API ?
>
> Yes and perhaps Hadoop API too, but not sure which one exactly since I
> haven't even thought about it (not even once).
>
> > Are there examples on how to create a DataSource from a REST endpoint ?
>
> Never heard of one.
>
> I'm hosting a Spark/Scala meetup this week so I'll definitely propose
> it as a topic. Thanks a lot!
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>


spark 2.0 readStream from a REST API

2016-07-31 Thread Ayoub Benali
Hello,

I started playing with the Structured Streaming API in spark 2.0 and I am
looking for a way to create streaming Dataset/Dataframe from a rest HTTP
endpoint but I am bit stuck.

"readStream" in SparkSession has a json method but this one is expecting a
path (s3, hdfs, etc) and I want to avoid having to save the data on s3 and
then read again.

What would be the easiest way to hack around it ? Do I need to implement
the Datasource API ?

Are there examples on how to create a DataSource from a REST endpoint ?

Best,
Ayoub


Re: RDD[Future[T]] => Future[RDD[T]]

2015-07-26 Thread Ayoub Benali
It doesn't work because mapPartitions expects a function f:(Iterator[T]) ⇒
Iterator[U] while .sequence wraps the iterator in a Future

2015-07-26 22:25 GMT+02:00 Ignacio Blasco :

> Maybe using mapPartitions and .sequence inside it?
> El 26/7/2015 10:22 p. m., "Ayoub"  escribió:
>
>> Hello,
>>
>> I am trying to convert the result I get after doing some async IO :
>>
>> val rdd: RDD[T] = // some rdd
>>
>> val result: RDD[Future[T]] = rdd.map(httpCall)
>>
>> Is there a way collect all futures once they are completed in a *non
>> blocking* (i.e. without scala.concurrent
>> Await) and lazy way?
>>
>> If the RDD was a standard scala collection then calling
>> "scala.concurrent.Future.sequence" would have resolved the issue but RDD
>> is
>> not a TraversableOnce (which is required by the method).
>>
>> Is there a way to do this kind of transformation with an RDD[Future[T]] ?
>>
>> Thanks,
>> Ayoub.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-Future-T-Future-RDD-T-tp24000.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: SQL JSON array operations

2015-01-15 Thread Ayoub Benali
You could try yo use hive context which bring HiveQL, it would allow you to
query nested structures using "LATERAL VIEW explode..."
On Jan 15, 2015 4:03 PM, "jvuillermet"  wrote:

> let's say my json file lines looks like this
>
> {"user": "baz", "tags" : ["foo", "bar"] }
> 
>
> sqlContext.jsonFile("data.json")
> ...
> How could I query for user with "bar" tags using SQL
>
> sqlContext.sql("select user from users where tags ?contains? 'bar' ")
>
> I could simplify the request and use the returned RDD to filter on tags but
> I'm exploring an app where users can write their SQL queries
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-JSON-array-operations-tp21164.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Parquet compression codecs not applied

2015-01-10 Thread Ayoub Benali
it worked thanks.

this doc page
recommends
to use "spark.sql.parquet.compression.codec" to set the compression coded
and I thought this setting would be forwarded to the hive context given
that HiveContext extends SQLContext, but it was not.

I am wondering if this behavior is normal, if not I could open an issue
with a potential fix so that "spark.sql.parquet.compression.codec" would be
translated to "parquet.compression" in the hive context ?

Or the documentation should be updated to mention that the compression
coded is set differently with HiveContext.

Ayoub.



2015-01-09 17:51 GMT+01:00 Michael Armbrust :

> This is a little confusing, but that code path is actually going through
> hive.  So the spark sql configuration does not help.
>
> Perhaps, try:
> set parquet.compression=GZIP;
>
> On Fri, Jan 9, 2015 at 2:41 AM, Ayoub  wrote:
>
>> Hello,
>>
>> I tried to save a table created via the hive context as a parquet file but
>> whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
>> setConf like:
>>
>> setConf("spark.sql.parquet.compression.codec", "gzip")
>>
>> the size of the generated files is the always the same, so it seems like
>> spark context ignores the compression codec that I set.
>>
>> Here is a code sample applied via the spark shell:
>>
>> import org.apache.spark.sql.hive.HiveContext
>> val hiveContext = new HiveContext(sc)
>>
>> hiveContext.sql("SET hive.exec.dynamic.partition = true")
>> hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
>> hiveContext.setConf("spark.sql.parquet.binaryAsString", "true") //
>> required
>> to make data compatible with impala
>> hiveContext.setConf("spark.sql.parquet.compression.codec", "gzip")
>>
>> hiveContext.sql("create external table if not exists foo (bar STRING, ts
>> INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
>> Location 'hdfs://path/data/foo'")
>>
>> hiveContext.sql("insert into table foo partition(year, month,day) select
>> *,
>> year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
>> day(from_unixtime(ts)) as day from raw_foo")
>>
>> I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
>> and I also tried that with Impala on the same cluster which applied
>> correctly the compression codecs.
>>
>> Does anyone know what could be the problem ?
>>
>> Thanks,
>> Ayoub.
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-compression-codecs-not-applied-tp21058.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Parquet compression codecs not applied

2015-01-08 Thread Ayoub Benali
Hello,

I tried to save a table created via the hive context as a parquet file but
whatever compression codec (uncompressed, snappy, gzip or lzo) I set via
setConf like:

setConf("spark.sql.parquet.compression.codec", "gzip")

the size of the generated files is the always the same, so it seems like
spark context ignores the compression codec that I set.

Here is a code sample applied via the spark shell:

import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)

hiveContext.sql("SET hive.exec.dynamic.partition = true")
hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
hiveContext.setConf("spark.sql.parquet.binaryAsString", "true") // required
to make data compatible with impala
hiveContext.setConf("spark.sql.parquet.compression.codec", "gzip")

hiveContext.sql("create external table if not exists foo (bar STRING, ts
INT) Partitioned by (year INT, month INT, day INT) STORED AS PARQUET
Location 'hdfs://path/data/foo'")

hiveContext.sql("insert into table foo partition(year, month,day) select *,
year(from_unixtime(ts)) as year, month(from_unixtime(ts)) as month,
day(from_unixtime(ts)) as day from raw_foo")

I tried that with spark 1.2 and 1.3 snapshot against hive 0.13
and I also tried that with Impala on the same cluster which applied
correctly the compression codecs.

Does anyone know what could be the problem ?

Thanks,
Ayoub.