What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From:  chandan prakash <chandanbaran...@gmail.com>
Date:  Thursday, July 12, 2018 at 10:37 AM
To:  Arun Iyer <ar...@apache.org>
Cc:  Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" 
<ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>, 
"user @spark" <user@spark.apache.org>
Subject:  Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response. 
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan <ar...@apache.org> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash <chandanbaran...@gmail.com>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das <tathagata.das1...@gmail.com>, "ymaha...@snappydata.io" 
<ymaha...@snappydata.io>, "priy...@asperasoft.com" <priy...@asperasoft.com>, 
"user @spark" <user@spark.apache.org>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi, 
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <tathagata.das1...@gmail.com> 
wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column. 

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan <ymaha...@snappydata.io> wrote:
I had a similar issue and i think that’s where the structured streaming design 
lacks.
Seems like Question#2 in your email is a viable workaround for you.

In my case, I have a custom Sink backed by an efficient in-memory column store 
suited for fast ingestion. 

I have a Kafka stream coming from one topic, and I need to classify the stream 
based on schema. 
For example, a Kafka topic can have three different types of schema messages 
and I would like to ingest into the three different column tables(having 
different schema) using my custom Sink implementation.

Right now only(?) option I have is to create three streaming queries reading 
the same topic and ingesting to respective column tables using their Sink 
implementations. 
These three streaming queries create underlying three IncrementalExecutions and 
three KafkaSources, and three queries reading the same data from the same Kafka 
topic. 
Even with CachedKafkaConsumers at partition level, this is not an efficient way 
to handle a simple streaming use case.

One workaround to overcome this limitation is to have same schema for all the 
messages in a Kafka partition, unfortunately this is not in our control and 
customers cannot change it due to their dependencies on other subsystems.

Thanks,
http://www.snappydata.io/blog

On Mon, Feb 12, 2018 at 5:54 PM, Priyank Shrivastava <priy...@asperasoft.com> 
wrote:
I have a structured streaming query which sinks to Kafka.  This query has a 
complex aggregation logic.



I would like to sink the output DF of this query to multiple Kafka topics each 
partitioned on a different ‘key’ column.  I don’t want to have multiple Kafka 
sinks for each of the different Kafka topics because that would mean running 
multiple streaming queries - one for each Kafka topic, especially since my 
aggregation logic is complex.



Questions:

1.  Is there a way to output the results of a structured streaming query to 
multiple Kafka topics each with a different key column but without having to 
execute multiple streaming queries? 



2.  If not,  would it be efficient to cascade the multiple queries such that 
the first query does the complex aggregation and writes output to Kafka and 
then the other queries just read the output of the first query and write their 
topics to Kafka thus avoiding doing the complex aggregation again?



Thanks in advance for any help.



Priyank







-- 
Chandan Prakash



-- 
Chandan Prakash


Reply via email to