Spark creates one connection for each query. The behavior you observed is
because how "nc -lk" works. If you use `netstat` to check the tcp
connections, you will see there are two connections when starting two
queries. However, "nc" forwards the input to only one connection.

On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz <rah...@gmail.com> wrote:

> Hi Gerard, hi List,
>
> I think what this would entail is for Source.commit to change its
> funcationality. You would need to track all streams' offsets there.
> Especially in the socket source, you already have a cache (haven't looked
> at Kafka's implementation to closely yet), so that shouldn't be the issue,
> if at start-time all streams subscribed to a source are known.
> What I worry about is, that this may need an API-change, to pass a
> stream-ID into commit. Since different streams can use different Triggers,
> you can have any number of unforeseeable results, when multiple threads
> call commit.
>
> I'll look into that, since I am in the progress of building a
> TwitterSource based on the socket source's general functionality, and due
> to the API restrictions there, it's even more important for multiple
> streams using one source.
>
> What I did observe was that every query did initialize a separate source.
> This won't work so well with socket, since the socket is in use, once you
> try to set up a second one. It also won't work so well with Twitter, since
> usually an API key is limited in how often it can be used somultaneously
> (likely at 2).
>
> An alternative to the socket source issue would be to open a new free
> socket, but then the user has to figure out where the source is listening.
>
> I second Gerard's request for additional information, and confirmation of
> my theories!
>
> Thanks,
>
> Rick
>
> On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I've been investigating this SO question: https://stackoverflo
>> w.com/questions/45618489/executing-separate-streaming-
>> queries-in-spark-structured-streaming
>>
>> TL;DR: when using the Socket source, trying to create multiple queries
>> does not work properly, only one the first query in the start order will
>> receive data.
>>
>> This minimal example reproduces the issue:
>>
>> val lines = spark
>>     .readStream
>>     .format("socket")
>>     .option("host", "localhost")
>>     .option("port", "9999")
>>     .option("includeTimestamp", true)
>>     .load()
>>
>> val q1 = lines.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> val q2 = lines.withColumn("foo", lit("foo")).writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> Sample output (spark shell):
>>
>> Batch: 0
>> -------------------------------------------
>> +-----+-------------------+
>> |value|          timestamp|
>> +-----+-------------------+
>> |  aaa|2017-08-11 23:37:59|
>> +-----+-------------------+
>>
>> -------------------------------------------
>> Batch: 1
>> -------------------------------------------
>> +-----+-------------------+
>> |value|          timestamp|
>> +-----+-------------------+
>> |  aaa|2017-08-11 23:38:00|
>> +-----+-------------------+
>>
>> q1.stop
>>
>> scala> -------------------------------------------
>> Batch: 0
>> -------------------------------------------
>> +-----+-------------------+---+
>> |value|          timestamp|foo|
>> +-----+-------------------+---+
>> |    b|2017-08-11 23:38:19|foo|
>> +-----+-------------------+---+
>>
>> -------------------------------------------
>> Batch: 1
>> -------------------------------------------
>> +-----+-------------------+---+
>> |value|          timestamp|foo|
>> +-----+-------------------+---+
>> |    b|2017-08-11 23:38:19|foo|
>> +-----+-------------------+---+
>>
>> This is certainly unexpected behavior. Even though the socket source is
>> marked "not for production" I wouldn't expect to be so limited.
>>
>> Am I right to think that the first running query consumes all the data in
>> the source, and therefore all the other queries do not work (until the
>> previous ones are stopped)?
>>
>> Is this a generalized behavior? e.g. each query started on a structured
>> streaming job fully consumes the source? e.g. the Kafka source can be used
>> with multiple queries because it can be replayed?
>>
>> As a workaround, would there be a way to cache the incoming data to
>> multiplex it? We cannot call `cache` a streaming dataset, but is there a
>> maybe way to do that?
>>
>> Could I have more details on the execution model (I've consumed all I
>> could find) and what are the (near) future plans?
>>
>> thanks!
>>
>> -Gerard.
>>
>>
>

Reply via email to