Hi Bartosz, This is not a question about whether the data source supports fixed or user-defined schema but what schema to use when requested for a streaming batch in Source.getBatch.
Pozdrawiam, Jacek Laskowski ---- https://about.me/JacekLaskowski "The Internals Of" Online Books <https://books.japila.pl/> Follow me on https://twitter.com/jaceklaskowski <https://twitter.com/jaceklaskowski> On Wed, Mar 31, 2021 at 7:44 PM Bartosz Konieczny <bartkoniec...@gmail.com> wrote: > Hi Jacek, > > An interesting question! I don't know the exact answer and will be happy > to learn by the way :) Below you can find my understanding for these 2 > things, hoping it helps a little. > > For me, we can distinguish 2 different source categories. The first of > them is a source with some fixed schema. A good example is Apache Kafka > which exposes the topic name, key, value and you can't change that; it's > always the same, whenever you run the reader in Company A or in Company B. > What changes is the data extraction logic from the key, value or headers. > But it's business-specific, not data store-specific. You can find the > schema implementation here: Kafka > <https://github.com/apache/spark/blob/v3.1.1/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L122> > > The second type is a source with user-defined schema, like a RDBMS table > or a NoSQL schemaless store. Here, predicting the schema will not only be > business-specific, but also data store-specific; you can set any name for a > Primary Key column, there is no such rule like "key" or "value" in Kafka. > To avoid runtime errors (= favor fail-fast approach before the data is > read), Spark can use the metadata to assert (analyze) the schema specified > by the user to confirm it or fail fast before reading the data. > > Best, > Bartosz. > > > > On Mon, Mar 29, 2021 at 1:07 PM Jacek Laskowski <ja...@japila.pl> wrote: > >> Hi, >> >> I've been developing a data source with a source and sink for Spark >> Structured Streaming. >> >> I've got a question about Source.getBatch [1]: >> >> def getBatch(start: Option[Offset], end: Offset): DataFrame >> >> getBatch returns a streaming DataFrame between the offsets so the idiom >> (?) is to have a code as follows: >> >> val relation = new MyRelation(...)(sparkSession) >> val plan = LogicalRelation(relation, isStreaming = true) >> new Dataset[Row](sparkSession, plan, RowEncoder(schema)) >> >> Note the use of schema [2] that is another part of the Source abstraction: >> >> def schema: StructType >> >> This is the "source" of my question. Is the above OK in a streaming sink >> / Source.getBatch? >> >> Since there are no interim operators that could change attributes >> (schema) I think it's OK. >> >> I've seen the following code and that made me wonder whether it's better >> or not compared to the solution above: >> >> val relation = new MyRelation(...)(sparkSession) >> val plan = LogicalRelation(relation, isStreaming = true) >> >> // When would we have to execute plan? >> val qe = sparkSession.sessionState.executePlan(plan) >> new Dataset[Row](sparkSession, plan, RowEncoder(qe.analyzed.schema)) >> >> When would or do we have to use qe.analyzed.schema vs simply schema? >> Could this qe.analyzed.schema help avoid some edge cases and is a preferred >> approach? >> >> Thank you for any help you can offer. Much appreciated. >> >> [1] >> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L61 >> [2] >> https://github.com/apache/spark/blob/053dd858d38e6107bc71e0aa3a4954291b74f8c8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala#L35 >> >> Pozdrawiam, >> Jacek Laskowski >> ---- >> https://about.me/JacekLaskowski >> "The Internals Of" Online Books <https://books.japila.pl/> >> Follow me on https://twitter.com/jaceklaskowski >> >> <https://twitter.com/jaceklaskowski> >> > > > -- > Bartosz Konieczny > data engineer > https://www.waitingforcode.com > https://github.com/bartosz25/ > https://twitter.com/waitingforcode > >