Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread kanth909
Is there a Kafka sink for Spark Structured Streaming ?

Sent from my iPhone

Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread kant kodali
Thanks!

On Fri, May 19, 2017 at 4:50 PM, Tathagata Das 
wrote:

> Should release by the end of this month.
>
> On Fri, May 19, 2017 at 4:07 PM, kant kodali  wrote:
>
>> Hi Patrick,
>>
>> I am using 2.1.1 and I tried the above code you sent and I get
>>
>> "java.lang.UnsupportedOperationException: Data source kafka does not
>> support streamed writing"
>>
>> so yeah this probably works only from Spark 2.2 onwards. I am not sure
>> when it officially releases.
>>
>> Thanks!
>>
>> On Fri, May 19, 2017 at 8:39 AM,  wrote:
>>
>>> Hi!
>>>
>>> Is this possible possible in spark 2.1.1?
>>>
>>> Sent from my iPhone
>>>
>>> On May 19, 2017, at 5:55 AM, Patrick McGloin 
>>> wrote:
>>>
>>> # Write key-value data from a DataFrame to a Kafka topic specified in an 
>>> option
>>> query = df \
>>>   .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS 
>>> value") \
>>>   .writeStream \
>>>   .format("kafka") \
>>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>>>   .option("topic", "topic1") \
>>>   .option("checkpointLocation", "/path/to/HDFS/dir") \
>>>   .start()
>>>
>>> Described here:
>>>
>>> https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>
>>>
>>>
>>> On 19 May 2017 at 10:45,  wrote:
>>>
 Is there a Kafka sink for Spark Structured Streaming ?

 Sent from my iPhone

>>>
>>>
>>
>


GraphFrames 0.5.0 - critical bug fix + other improvements

2017-05-19 Thread Joseph Bradley
Hi Spark community,

I'd like to announce a new release of GraphFrames, a Spark Package for
DataFrame-based graphs!

*We strongly encourage all users to use this latest release for the bug fix
described below.*

*Critical bug fix*
This release fixes a bug in indexing vertices.  This may have affected your
results if:
* your graph uses non-Integer IDs and
* you use ConnectedComponents and other algorithms which are wrappers
around GraphX.
The bug occurs when the input DataFrame is non-deterministic. E.g., running
an algorithm on a DataFrame just loaded from disk should be fine in
previous releases, but running that algorithm on a DataFrame produced using
shuffling, unions, and other operators can cause incorrect results. This
issue is fixed in this release.

*New features*
* Python API for aggregateMessages for building custom graph algorithms
* Scala API for parallel personalized PageRank, wrapping the GraphX
implementation. This is only available when using GraphFrames with Spark
2.1+.

Support for Spark 1.6, 2.0, and 2.1

*Special thanks to Felix Cheung for his work as a new committer for
GraphFrames!*

*Full release notes*:
https://github.com/graphframes/graphframes/releases/tag/release-0.5.0
*Docs*: http://graphframes.github.io/
*Spark Package*: https://spark-packages.org/package/graphframes/graphframes
*Source*: https://github.com/graphframes/graphframes

Thanks to all contributors and to the community for feedback!
Joseph

-- 

Joseph Bradley

Software Engineer - Machine Learning

Databricks, Inc.

[image: http://databricks.com] 


Spark Streaming: Custom Receiver OOM consistently

2017-05-19 Thread Manish Malhotra
Hello,

have implemented Java based custom receiver, which consumes from messaging
system say JMS.
once received message, I call store(object) ... Im storing spark Row object.

it run for around 8 hrs, and then goes OOM, and OOM is happening in receiver
 nodes.
I also tried to run multiple receivers, to distribute the load but faces
the same issue.

something fundamentally we are doing wrong, which tells custom receiver/spark
to release the memory.
but Im not able to crack that, atleast till now.

any help is appreciated !!

Regards,
Manish


SparkSQL not able to read a empty table location

2017-05-19 Thread Bajpai, Amit X. -ND
Hi,

I have a hive external table with the S3 location having no files (but the S3 
location directory does exists). When I am trying to use Spark SQL to count the 
number of records in the table it is throwing error saying “File s3n://data/xyz 
does not exist. null/0”.

select * from tablex limit 10

Can someone let me know how we can fix this issue.

Thanks


Re: [Spark Streamiing] Streaming job failing consistently after 1h

2017-05-19 Thread Manish Malhotra
Im also facing same problem.

I have implemented Java based custom receiver, which consumes from
messaging system say JMS.
once received message, I call store(object) ... Im storing spark Row object.

it run for around 8 hrs, and then goes OOM, and OOM is happening in
receiver nodes.
I also tried to run multiple receivers, to distribute the load but faces
the same issue.

something fundamentally we are doing wrong, which tells custom
receiver/spark to release the memory.
but Im not able to crack that, atleast till now.

any help is appreciated spark group !!

Regards,
Manish



On Sun, Mar 5, 2017 at 6:37 PM, Charles O. Bajomo <
charles.baj...@pretechconsulting.co.uk> wrote:

> Hello all,
>
> I have a strange behaviour I can't understand. I have a streaming job
> using a custom java receiver that pull data from a jms queue that I process
> and then write to HDFS as parquet and avro files. For some reason my job
> keeps failing after 1hr and 30 minutes. When It fails I get an error saying
> the "container is running beyond physical memory limits. Current Usage
> 4.5GB of 4.5GB physical memory used. 6.4GB of 9.4GB virtual memory used. ".
> to be honest I don;t understand the error,  What are the memory limits
> shown in the error referring to? I allocated 10 executors with 6 cores each
> and 4G of executor and driver memory. I set the overhead memory to 2.8G, so
> the values don't add up.
>
> Anyone have any idea what the error is referring? I have increased the
> memory and i didn't help, it appears it just bought me more time.
>
> Thanks.
>


Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Patrick McGloin
# Write key-value data from a DataFrame to a Kafka topic specified in an option
query = df \
  .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .option("checkpointLocation", "/path/to/HDFS/dir") \
  .start()

Described here:

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html



On 19 May 2017 at 10:45,  wrote:

> Is there a Kafka sink for Spark Structured Streaming ?
>
> Sent from my iPhone
>


java.lang.OutOfMemoryError

2017-05-19 Thread Kürşat Kurt
Hi;



I am trying multiclass text classification with Randomforest Classifier on
my local computer(16 GB RAM, 4 physical cores ).

When i run with the parameters below, i am getting
"java.lang.OutOfMemoryError: GC overhead limit exceeded" error.

 

spark-submit --driver-memory 1G --driver-memory 2G --executor-memory 1G
--total-executor-cores 8 project-assembly-1.0.

 

My train data has 400.000 line and future(unique word) size is ~50.000.

 



Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread kanth909
Hi!

Is this possible possible in spark 2.1.1?

Sent from my iPhone

> On May 19, 2017, at 5:55 AM, Patrick McGloin  
> wrote:
> 
> # Write key-value data from a DataFrame to a Kafka topic specified in an 
> option
> query = df \
>   .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") 
> \
>   .writeStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>   .option("topic", "topic1") \
>   .option("checkpointLocation", "/path/to/HDFS/dir") \
>   .start()
> Described here:
> https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
> 
> 
>> On 19 May 2017 at 10:45,  wrote:
>> Is there a Kafka sink for Spark Structured Streaming ?
>> 
>> Sent from my iPhone
> 


Reading PDF/text/word file efficiently with Spark

2017-05-19 Thread tesmai4
Hi,I am doing NLP (Natural Language Processing) processing on my data. The
data is in form of files that can be of type PDF/Text/Word/HTML. These files
are stored in a directory structure on my local disk, even nested
directories. My stand alone Java based NLP parser can read input files,
extract text from these and do the NLP processing on the extracted text.I am
converting my Java based NLP parser to execute it on my Spark cluster.  I
know that Spark can read multiple text files from a directory and convert
into RDDs for further processing. My input data is not only in text files,
but in a multitude of different file formats. My question is: How can I
efficiently read the input files (PDF/Text/Word/HTML) in my Java based Spark
program for processing these files in Spark cluster.Regards,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-PDF-text-word-file-efficiently-with-Spark-tp28699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reading PDF/text/word file efficiently with Spark

2017-05-19 Thread tesm...@gmail.com
Hi,
I am doing NLP (Natural Language Processing) processing on my data. The
data is in form of files that can be of type PDF/Text/Word/HTML. These
files are stored in a directory structure on my local disk, even nested
directories. My stand alone Java based NLP parser can read input files,
extract text from these and do the NLP processing on the extracted text.

I am converting my Java based NLP parser to execute it on my Spark cluster.
I know that Spark can read multiple text files from a directory and convert
into RDDs for further processing. My input data is not only in text files,
but in a multitude of different file formats. My question is: How can I
efficiently read the input files (PDF/Text/Word/HTML) in my Java based
Spark program for processing these files in Spark cluster. Regards,

Regards,


Re: Refreshing a persisted RDD

2017-05-19 Thread Sudhir Menon
Part of the problem here is that the static dataframe is designed to be
used a read only abstraction in Spark, and updating that requires the user
to drop the dataframe holding the reference data and recreate it. And in
order for the join to use the recreated dataframe, the query has to be
restarted, which results in the stream processing glitching. Jayesh's use
case is something we see frequently in working with real customers who love
Spark for a lot of reasons but like to have the ease of use that comes from
being able to work with mutable abstractions.

If you were to do this with SnappyData, since the product supports
mutability of tables (and hence dataframe mutability) out of the box, you
would simply be able to update the information in your "static" data frame.
The code snippet below allows you to simply mutate the blacklist table
(which is the only thing the user needs to do) and keep going and the join
now automatically uses the updated dataframe without any further action
from the user.

-

  *val spark: SparkSession = SparkSession*

*.builder*

*.appName("Demo")*

*.master("local[*]")*

*.getOrCreate*


*  val session = new SnappySession(spark.sparkContext)*

*  val snc = session.snappyContext*


*  case class Account(accountName: String) // For simplicity, account just
has one column, accountName*

*//Black list includes account numbers ranging from 15 to 25  *

*val rdd = snc.sparkContext.parallelize((15 to 25).map(i =>
Account(i.toString)))*

*  val dfBlackList = snc.createDataFrame(rdd)   // This can be any DF, like
created from s3 using session.read.csv("pathToCSV")*

*  // create a SnappyData table*

*  snc.createTable("blacklist", "row", dfBlackList.schema,
Map.empty[String, String]) *

*  import org.apache.spark.sql.snappy._*

*  dfBlackList.write.putInto("blacklist") // populate the table
'blacklist'. *

*  // This is an updatable table, so you can update/insert like a normal
RDBMS table, for example as shown below*

 *//   val newColumnValues: Row = Row(26)*

* //  snc.update("blacklist",  newColumnValues, "**accountName**")*



*  import spark.implicits._*

*  // Read the accounts from Kafka source*

*  val dfAccount = session*

*.readStream*

*.format("kafka")*

*.option("kafka.bootstrap.servers", "localhost:9092")*

*.option("subscribe", topic)*

*.option("startingOffsets", "earliest").load*

*.selectExpr("CAST(value AS STRING) accountName").as[(String)]*


*  // Simple join your streaming DataFrame with the blackList DataFrame.*

*  val query = dfAccount.join(dfBlackList, "accountName")*

*.writeStream*

*.outputMode("append")*

*.format("snappy") // stored in-memory SQL tables.*

*.option("checkpointLocation", "/tmp")*

*.queryName("snappyResultTable") // You can save it to SnappyData's
column table*

*.trigger(ProcessingTime("1 seconds"))*

*.start*


 * query.awaitTermination(timeoutMs = 15000) *

*  session.sql("select * from snappyResultTable").show*

You can get the product release that works with this code snippet here


You can reach out to us with questions here


Generally avoid responding on this forum with product specific answers, but
in this case, it seems to offer a simpler pattern


Suds

On Wed, May 3, 2017 at 4:30 PM, Tathagata Das  wrote:

> Yes, you will have to recreate the streaming Dataframe along with the
> static Dataframe, and restart the query. There isnt a currently feasible to
> do this without a query restart. But restarting a query WITHOUT restarting
> the whole application + spark cluster, is reasonably fast. If your
> applicatoin can tolerate 10 second latencies, then stopping and restarting
> a query within the same Spark application is a reasonable solution.
>
> On Wed, May 3, 2017 at 4:13 PM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Thanks, TD for answering this question on the Spark mailing list.
>>
>>
>>
>> A follow-up. So, let’s say we are joining a cached dataframe with a
>> streaming dataframe, and we recreate the cached dataframe, do we have to
>> recreate the streaming dataframe too?
>>
>>
>>
>> One possible solution that we have is
>>
>>
>>
>> val dfBlackList = spark.read.csv(….) //batch dataframe… assume that this
>> dataframe has a single column namedAccountName
>> dfBlackList.createOrReplaceTempView(“blacklist”)
>> val dfAccount = spark.readStream.kafka(…..) // assume for brevity’s sake
>> that we have parsed the kafka payload and have a data frame here with
>> multiple columns.. one of them called accountName
>>
>> dfAccount. createOrReplaceTempView(“account”)
>>
>> val dfBlackListedAccount = spark.sql(“select * from account inner join
>> blacklist on account.accountName = blacklist.accountName”)
>>
>> df.writeStream(…..).start() // boom started
>>
>>
>>
>> Now some time later while the query is running 

Bizarre UI Behavior after migration

2017-05-19 Thread Miles Crawford
Hey ya'll,

Trying to migrate from Spark 1.6.1 to 2.1.0.

I use EMR, and launched a new cluster using EMR 5.5, which runs spark 2.1.0.

I updated my dependencies, and fixed a few API changes related to
accumulators, and presto! my application was running on the new cluster.

But the application UI shows crazy output:
https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0

The applications seem to complete successfully, but I was wondering if
anyone has an idea of what might be going wrong?

Thanks,
-Miles


Re: Spark UI shows Jobs are processing, but the files are already written to S3

2017-05-19 Thread Miles Crawford
Could I be experiencing the same thing?

https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0

On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
wrote:

> I think that is a bug. I have seen that a lot especially with long running
> jobs where Spark skips a lot of stages because it has pre-computed results.
> And some of these are never marked as completed, even though in reality
> they are. I figured this out because I was using the interactive shell
> (spark-shell) and the shell came up to a prompt indicating the job had
> finished even though there were a lot of Active jobs and tasks according to
> the UI. And my output is correct.
>
>
>
> Is there a JIRA item tracking this?
>
>
>
> *From:* Kuchekar [mailto:kuchekar.nil...@gmail.com]
> *Sent:* Wednesday, November 16, 2016 10:00 AM
> *To:* spark users 
> *Subject:* Spark UI shows Jobs are processing, but the files are already
> written to S3
>
>
>
> Hi,
>
>
>
>  I am running a spark job, which saves the computed data (massive
> data) to S3. On  the Spark Ui I see the some jobs are active, but no
> activity in the logs. Also on S3 all the data has be written (verified each
> bucket --> it has _SUCCESS file)
>
>
>
> Am I missing something?
>
>
>
> Thanks.
>
> Kuchekar, Nilesh
>


Shuffle read is very slow in spark

2017-05-19 Thread KhajaAsmath Mohammed
Hi ,

I am in weird situation where the spark job  behaves abnormal. sometimes it
runs very fast and some times it takes long time to complete. Not sure what
is going on. Cluster is free most of the times.

Below image shows the shuffle read is still taking more than 3 hours to
write data back into hive table using sql.context("Insert overwrirte table
hivetable from select * from spark_temporary_table")

[image: Inline image 1]

could anyone let me know how to resolve this and run the job faster.

Thanks,
Asmath


Re: Spark-SQL collect function

2017-05-19 Thread Aakash Basu
Well described​, thanks!

On 04-May-2017 4:07 AM, "JayeshLalwani" 
wrote:

> In any distributed application, you scale up by splitting execution up on
> multiple machines. The way Spark does this is by slicing the data into
> partitions and spreading them on multiple machines. Logically, an RDD is
> exactly that: data is split up and spread around on multiple machines. When
> you perform operations on an RDD, Spark tells all the machines to perform
> that operation on their own slice of data. SO, for example, if you perform
> a
> filter operation (or if you are using SQL, you do /Select * from tablename
> where col=colval/, Spark tells each machine to look for rows that match
> your
> filter criteria in their own slice of data. This operation results in
> another distributed dataset that contains the filtered records. Note that
> when you do a filter operation, Spark doesn't move data outside of the
> machines that they reside in. It keeps the filtered records in the same
> machine. This ability of Spark to keep data in place is what provides
> scalability. As long as your operations keep data in place, you can scale
> up
> infinitely. If you got 10x more records, you can add 10x more machines, and
> you will get the same performance
>
> However, the problem is that a lot of operations cannot be done by keeping
> data in place. For example, let's say you have 2 tables/dataframes. Spark
> will slice both up and spread them around the machines. Now let's say, you
> joined both tables. It may happen that the slice of data that resides in
> one
> machine has matching records in another machine. So, now, Spark has to
> bring
> data over from one machine to another. This is what Spark calls a
> /shuffle/Spark does this intelligently. However, whenever data leaves one
> machine and goes to other machines, you cannot scale infinitely. There will
> be a point at which you will overwhelm the network, and adding more
> machines
> isn't going to improve performance.
>
> So, the point is that you have to avoid shuffles as much as possible. You
> cannot eliminate shuffles altogether, but you can reduce them
>
> Now, /collect/ is the granddaddy of all shuffles. It causes Spark to bring
> all the data that it has distributedd over the machines into a single
> machine. If you call collect on a large table, it's analogous to drinking
> from a firehose. You are going to drown.Calling collect on a small table is
> fine, because very little data will move
>
> Usually, it's recommended to run all your aggregations using Spark SQL, and
> when you get the data boiled down to a small enough size that can be
> presented to a human, you can call collect on it to fetch it and present it
> to the human user.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Spark-SQL-collect-function-tp28644p28647.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-19 Thread Aakash Basu
Hey all,

A reply on this would be great!

Thanks,
A.B.

On 17-May-2017 1:43 AM, "Daniel Siegmann" 
wrote:

> When using spark.read on a large number of small files, these are
> automatically coalesced into fewer partitions. The only documentation I can
> find on this is in the Spark 2.0.0 release notes, where it simply says (
> http://spark.apache.org/releases/spark-release-2-0-0.html):
>
> "Automatic file coalescing for native data sources"
>
> Can anyone point me to documentation explaining what triggers this
> feature, how it decides how many partitions to coalesce to, and what counts
> as a "native data source"? I couldn't find any mention of this feature in
> the SQL Programming Guide and Google was not helpful.
>
> --
> Daniel Siegmann
> Senior Software Engineer
> *SecurityScorecard Inc.*
> 214 W 29th Street, 5th Floor
> New York, NY 10001
>
>


RE: IOT in Spark

2017-05-19 Thread Lohith Samaga M
Hi Gaurav,
You can process IoT data using Spark. But where will you store the 
raw/processed data -  Cassandra, Hive, HBase?
You might want to look at the Hadoop cluster for data storage and 
processing (Spark using Yarn).
For processing streaming data, you might also explore Apache Storm and 
Apache Flink.
I suggest it is better to do a POC in each of them and then decide on 
what works best for you.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga





-Original Message-
From: Gaurav1809 [mailto:gauravhpan...@gmail.com] 
Sent: Friday, May 19, 2017 09.28
To: user@spark.apache.org
Subject: IOT in Spark

Hello gurus,

How exactly it works in real world scenarios when it come to read data from IOT 
devices (say for example censors at in/out gate in huge mall)? Can we do it in 
Spark? Do we need to use any other tool/utility (kafka???) to read data from 
those censors and then process them in Spark? Please share your thoughts on 
this and it will give me headstart to my work. I am completely unaware of the 
technology stack that can be used here, so any pointers to this will be so much 
helpful. Thanks.

-Gaurav



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/IOT-in-Spark-tp28698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread kant kodali
Hi Patrick,

I am using 2.1.1 and I tried the above code you sent and I get

"java.lang.UnsupportedOperationException: Data source kafka does not
support streamed writing"

so yeah this probably works only from Spark 2.2 onwards. I am not sure when
it officially releases.

Thanks!

On Fri, May 19, 2017 at 8:39 AM,  wrote:

> Hi!
>
> Is this possible possible in spark 2.1.1?
>
> Sent from my iPhone
>
> On May 19, 2017, at 5:55 AM, Patrick McGloin 
> wrote:
>
> # Write key-value data from a DataFrame to a Kafka topic specified in an 
> option
> query = df \
>   .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value") 
> \
>   .writeStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>   .option("topic", "topic1") \
>   .option("checkpointLocation", "/path/to/HDFS/dir") \
>   .start()
>
> Described here:
>
> https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
>
>
> On 19 May 2017 at 10:45,  wrote:
>
>> Is there a Kafka sink for Spark Structured Streaming ?
>>
>> Sent from my iPhone
>>
>
>


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-19 Thread ayan guha
I think like all other read operations, it is driven by input format used,
and I think some variation of combine file input format is used by default.
I think you can test it by force a particular input format which gets ine
file per split, then you should end up with same number of partitions as
your dsta files

On Sat, 20 May 2017 at 5:12 am, Aakash Basu 
wrote:

> Hey all,
>
> A reply on this would be great!
>
> Thanks,
> A.B.
>
> On 17-May-2017 1:43 AM, "Daniel Siegmann" 
> wrote:
>
>> When using spark.read on a large number of small files, these are
>> automatically coalesced into fewer partitions. The only documentation I can
>> find on this is in the Spark 2.0.0 release notes, where it simply says (
>> http://spark.apache.org/releases/spark-release-2-0-0.html):
>>
>> "Automatic file coalescing for native data sources"
>>
>> Can anyone point me to documentation explaining what triggers this
>> feature, how it decides how many partitions to coalesce to, and what counts
>> as a "native data source"? I couldn't find any mention of this feature in
>> the SQL Programming Guide and Google was not helpful.
>>
>> --
>> Daniel Siegmann
>> Senior Software Engineer
>> *SecurityScorecard Inc.*
>> 214 W 29th Street, 5th Floor
>> New York, NY 10001
>>
>> --
Best Regards,
Ayan Guha


Re: Is there a Kafka sink for Spark Structured Streaming

2017-05-19 Thread Tathagata Das
Should release by the end of this month.

On Fri, May 19, 2017 at 4:07 PM, kant kodali  wrote:

> Hi Patrick,
>
> I am using 2.1.1 and I tried the above code you sent and I get
>
> "java.lang.UnsupportedOperationException: Data source kafka does not
> support streamed writing"
>
> so yeah this probably works only from Spark 2.2 onwards. I am not sure
> when it officially releases.
>
> Thanks!
>
> On Fri, May 19, 2017 at 8:39 AM,  wrote:
>
>> Hi!
>>
>> Is this possible possible in spark 2.1.1?
>>
>> Sent from my iPhone
>>
>> On May 19, 2017, at 5:55 AM, Patrick McGloin 
>> wrote:
>>
>> # Write key-value data from a DataFrame to a Kafka topic specified in an 
>> option
>> query = df \
>>   .selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS 
>> value") \
>>   .writeStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
>>   .option("topic", "topic1") \
>>   .option("checkpointLocation", "/path/to/HDFS/dir") \
>>   .start()
>>
>> Described here:
>>
>> https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>>
>>
>> On 19 May 2017 at 10:45,  wrote:
>>
>>> Is there a Kafka sink for Spark Structured Streaming ?
>>>
>>> Sent from my iPhone
>>>
>>
>>
>