[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239670323 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- > insert into kafka_sql_out select stream t1.value from (select cast(value as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value No, SQLStreaming support stream join stream. The watermark config is put in the table properties. As for trigger interval, different sources in stream join stream scene needs different trigger config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239113033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_WATERMARK_ENABLE = buildConf("spark.sqlstreaming.watermark.enable") +.doc("Whether use watermark in sqlstreaming.") +.booleanConf +.createWithDefault(false) + + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") +.doc("The output mode used in sqlstreaming") +.stringConf +.createWithDefault("append") + + val SQLSTREAM_TRIGGER = buildConf("spark.sqlstreaming.trigger") --- End diff -- I don't think there are any problems with this. SQLStreaming is using Command to run streaming query, which is similar to InsertIntoHiveTable. herefore, the batch SQL and streaming SQL solution is expected.In addition, currently an application can only run one streaming SQL. Therefore, the batch SQL and streaming SQL solution is expected --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r239109280 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/SQLStreamingSink.scala --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils +import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.util.Utils + +/** + * The basic RunnableCommand for SQLStreaming, using Command.run to start a streaming query. + * + * @param sparkSession + * @param extraOptions + * @param partitionColumnNames + * @param child + */ +case class SQLStreamingSink(sparkSession: SparkSession, +table: CatalogTable, +child: LogicalPlan) + extends RunnableCommand { + + private val sqlConf = sparkSession.sqlContext.conf + + /** + * The given column name may not be equal to any of the existing column names if we were in + * case-insensitive context. Normalize the given column name to the real one so that we don't + * need to care about case sensitivity afterwards. + */ + private def normalize(df: DataFrame, columnName: String, columnType: String): String = { +val validColumnNames = df.logicalPlan.output.map(_.name) +validColumnNames.find(sparkSession.sessionState.analyzer.resolver(_, columnName)) + .getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " + +s"existing columns (${validColumnNames.mkString(", ")})")) + } + + /** + * Parse spark.sqlstreaming.trigger.seconds to Trigger + */ + private def parseTrigger(): Trigger = { +val trigger = Utils.timeStringAsMs(sqlConf.sqlStreamTrigger) +Trigger.ProcessingTime(trigger, TimeUnit.MICROSECONDS) --- End diff -- Yeah, I will change it to milliseconds. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/21306 @mccheah you mean the tables user created do not distinguish between stream and batch, but only when they are actually read from it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/21306 > @stczwd, thanks for taking a look at this. What are the differences between batch and stream DDL that you think will come up? 1. Source needs to be defined for stream table 2. Stream table requires a special flags to indicate that it is a stream table. 3. User and Program need to be aware of whether this table is a stream table. 4. What would we do if the user wants to change the stream table to batch table or convert the batch table to stream table? 5. What does the stream table metadata you define look like? What is the difference between batch table metadata and batch table metadata? I defined the Stream Table based on DataSource V1 (see in[ Support SQLStreaming in Spark](https://github.com/apache/spark/pull/22575)), but found that the above problem can not be completely solved with the catalog api. How would you solve these in mew Catalog? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r237721103 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', + |location = '${dir.toURI}', + |isStreaming = 'true') + |AS SELECT 1 AS a, 2 AS b, 3 AS c + """.stripMargin) --- End diff -- In this place, child is a streaming logicalPlan. If not, it will throw Exceptions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21306: [SPARK-24252][SQL] Add catalog registration and table ca...
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/21306 @rdblue Have you considered about stream table API? It may have some differences between batch table ddl and stream table ddl. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 I hive send an email to Ryan Blue. > > > Can you send a mail to Ryan blue for adding this SPIP topic in tomorrow meeting. Meeting will be conducted tomorrow 05:00 pm PST. If you confirm then we can also attend the meeting. > > > > > > I have send an email to Ryan Blue to attend this meeting. > > I think you should also ask him to add your SPIP topic for tomorrows discussion.Agenda has to be set prior. Tomorrow's discussion is mainly focus on DataSource V2 API, I don't think they will spend time to discuss SQL API. However, We can mention it while discussing the Catalog API. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 > Can you send a mail to Ryan blue for adding this SPIP topic in tomorrow meeting. Meeting will be conducted tomorrow 05:00 pm PST. If you confirm then we can also attend the meeting. I have send an email to Ryan Blue to attend this meeting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 > ![image](https://user-images.githubusercontent.com/12999161/49129177-ab056680-f2f4-11e8-8f71-4695ebc045c1.png) I have removed the 'stream' keyword. > There is a DatasourceV2 community synch meetup tomorrow which is cordinated by Ryan Blue , can we discuss this point. Yep, it's a good idea. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 @sujithjay Please refer [SPARK-24630](https://issues.apache.org/jira/browse/SPARK-24630) for more details. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 @cloud-fan @zsxwing @tdas @xuanyuanking This patch has been submitted for a long time. Do you have any questions? Can this patch be merged in? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 > Currently Dataframe API support "writeStream.start()" api to run streaming in background, so that query can be executed on that sink, also multiple stream to stream processing can happen in single session. > How this can be achieved using INSERT INTO stream? > How multiple streams with different properties can be executed in same session? SQLStreaming does not support multiple streams. In our cases, SQLStreaming is basically used in ad-hoc, Each case only run one insert into steam. Still, SQLStreaming can support multiple streams with Table API. `spark.table("kafka_stream").groupBy("value").count().writeStream.outputMode("complete").format("console").start()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 @tdas @zsxwing @cloud-fan Hi, any other questions block this patch for merge in? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 @WangTaoTheTonic @cloud-fan @xuanyuanking I have removed the stream keyword. Table API is supoorted in SQLStreaming now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 > ql and normal sql? how could users define watermark with SQL? Yes, the 'stream' keyword is the only difference from normal sql. We can use configuration to define watermark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -631,6 +631,33 @@ object SQLConf { .intConf .createWithDefault(200) + val SQLSTREAM_OUTPUTMODE = buildConf("spark.sqlstreaming.outputMode") --- End diff -- Then, may you provide a more appropriate name? These configurations should be used only in SQLStreaming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853804 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/StreamTableDDLCommandSuite.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils + +class StreamTableDDLCommandSuite extends SQLTestUtils with TestHiveSingleton { + private val catalog = spark.sessionState.catalog + + test("CTAS: create data source stream table") { +withTempPath { dir => + withTable("t") { +sql( + s"""CREATE TABLE t USING PARQUET + |OPTIONS ( + |PATH = '${dir.toURI}', +|location = '${dir.toURI}', --- End diff -- Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on a diff in the pull request: https://github.com/apache/spark/pull/22575#discussion_r226853724 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -63,7 +63,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister providerName: String, parameters: Map[String, String]): (String, StructType) = { validateStreamOptions(parameters) -require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one") +if(schema.isDefined) { --- End diff -- KafkaStreamSourceProvider is an expression for SQLStreaming. When creating DataSource based on Kafka Streaming Table, schema will be passed, thus KafkaSourceProvider should be compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 cc @xuanyuanking --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 > How should we do if we wanna join two kafka stream and sink the result to another stream? `insert into kafka_sql_out select stream t1.value from (select cast(value as string), timestamp as time1 from kafka_sql_in1) as t1 inner join (select cast(value as string), timestamp as time2 from kafka_sql_in2) as t2 on time1 >= time2 and time1 <= time2 + interval 10 seconds where t1.value == t2.value` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22575: [SPARK-24630][SS] Support SQLStreaming in Spark
Github user stczwd commented on the issue: https://github.com/apache/spark/pull/22575 @WangTaoTheTonic Adding 'stream' keyword has two purposes: - **Mark the entire sql query as a stream query and generate the SQLStreaming plan tree.** - **Mark the table type as UnResolvedStreamRelation.** Parse the table as StreamingRelation or other Relation, especially in the stream join batch queries, such as kafka join mysql. **Besides, the keyword 'stream' makes it easier to express StructStreaming with pure SQL.** A little example to show importances of 'stream': read stream from kafka stream table, and join mysql to count user message - with 'stream' - `select stream kafka_sql_test.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name` - **It will be regarded as Streaming Query using Console Sink**, the kafka_sql_test will be parsed as StreamingRelation and mysql_test will be parsed as JDBCRelation, not Streaming Relation. - `insert into csv_sql_table select stream kafka_sql_test.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name` - **It will be regarded as Streaming Query using FileStream Sink**, the kafka_sql_test will be parsed as StreamingRelation and mysql_test will be parsed as JDBCRelation, not Streaming Relation. - without 'stream' - `select kafka_sql.name, count(door) from kafka_sql_test inner join mysql_test on kafka_sql_test.name == mysql_test.name group by kafka_sql_test.name` - **It will be regarded as Batch Query**, the kafka_sql_test will be parsed to KafkaRelation and mysql_test will be parsed as JDBCRelation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22575: [SPARK-24630][SS][WIP] Support SQLStreaming in Sp...
GitHub user stczwd opened a pull request: https://github.com/apache/spark/pull/22575 [SPARK-24630][SS][WIP] Support SQLStreaming in Spark ## What changes were proposed in this pull request? This patch propose new support of SQLStreaming in Spark, Please refer [SPARK-24630](https://issues.apache.org/jira/browse/SPARK-24630) for more details. This patch supports: 1. Support create stream table, which can be used as Source and Sink in SQLStreaming; `create table kafka_sql_test using kafka options( isStreaming = 'true', subscribe = 'topic', kafka.bootstrap.servers = 'localhost:9092')` 2. Add keyword 'STREAM' in sql to support SQLStreaming queries; `select stream * from kafka_sql_test` 3. As for those complex queries, they all can be supported as long as SQL and StructStreaming support. ## How was this patch tested? Some UTs are added to verify sqlstreaming. You can merge this pull request into a Git repository by running: $ git pull https://github.com/stczwd/spark sqlstreaming Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22575.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22575 commit af26ea77157a7ff4e0a2c5eecec64c57f73c425d Author: Jackey Lee Date: 2018-09-28T01:04:17Z Support SQLStreaming in Spark: Add keyword 'STREAM'; Support create stream table --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org