Hi Bartosz,

This is not a question about whether the data source supports fixed or
user-defined schema but what schema to use when requested for a streaming
batch in Source.getBatch.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Wed, Mar 31, 2021 at 7:44 PM Bartosz Konieczny <bartkoniec...@gmail.com>
wrote:

> Hi Jacek,
>
> An interesting question! I don't know the exact answer and will be happy
> to learn by the way :) Below you can find my understanding for these 2
> things, hoping it helps a little.
>
> For me, we can distinguish 2 different source categories. The first of
> them is a source with some fixed schema. A good example is Apache Kafka
> which exposes the topic name, key, value and you can't change that; it's
> always the same, whenever you run the reader in Company A or in Company B.
> What changes is the data extraction logic from the key, value or headers.
> But it's business-specific, not data store-specific. You can find the
> schema implementation here: Kafka
> <https://github.com/apache/spark/blob/v3.1.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L122>
>
> The second type is a source with user-defined schema, like a RDBMS table
> or a NoSQL schemaless store. Here, predicting the schema will not only be
> business-specific, but also data store-specific; you can set any name for a
> Primary Key column, there is no such rule like "key" or "value" in Kafka.
> To avoid runtime errors (= favor fail-fast approach before the data is
> read), Spark can use the metadata to assert (analyze) the schema specified
> by the user to confirm it or fail fast before reading the data.
>
> Best,
> Bartosz.
>
>
>
> On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> I've been developing a data source with a source and sink for Spark
>> Structured Streaming.
>>
>> I've got a question about Source.getBatch [1]:
>>
>>     def getBatch(start: Option[Offset], end: Offset): DataFrame
>>
>> getBatch returns a streaming DataFrame between the offsets so the idiom
>> (?) is to have a code as follows:
>>
>>     val relation = new MyRelation(...)(sparkSession)
>>     val plan = LogicalRelation(relation, isStreaming = true)
>>     new Dataset[Row](sparkSession, plan, RowEncoder(schema))
>>
>> Note the use of schema [2] that is another part of the Source abstraction:
>>
>>     def schema: StructType
>>
>> This is the "source" of my question. Is the above OK in a streaming sink
>> / Source.getBatch?
>>
>> Since there are no interim operators that could change attributes
>> (schema) I think it's OK.
>>
>> I've seen the following code and that made me wonder whether it's better
>> or not compared to the solution above:
>>
>>     val relation = new MyRelation(...)(sparkSession)
>>     val plan = LogicalRelation(relation, isStreaming = true)
>>
>>     // When would we have to execute plan?
>>     val qe = sparkSession.sessionState.executePlan(plan)
>>     new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema))
>>
>> When would or do we have to use qe.analyzed.schema vs simply schema?
>> Could this qe.analyzed.schema help avoid some edge cases and is a preferred
>> approach?
>>
>> Thank you for any help you can offer. Much appreciated.
>>
>> [1]
>> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61
>> [2]
>> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> ----
>> https://about.me/JacekLaskowski
>> "The Internals Of" Online Books <https://books.japila.pl/>
>> Follow me on https://twitter.com/jaceklaskowski
>>
>> <https://twitter.com/jaceklaskowski>
>>
>
>
> --
> Bartosz Konieczny
> data engineer
> https://www.waitingforcode.com
> https://github.com/bartosz25/
> https://twitter.com/waitingforcode
>
>

Reply via email to