[CARBONDATA-2055][Streaming] Support integrating Stream table with Spark Streaming
This closes #1867 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6bb5a2b0 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6bb5a2b0 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6bb5a2b0 Branch: refs/heads/branch-1.3 Commit: 6bb5a2b0a0a8177f14d90177b44e74d38eb69feb Parents: cf2390a Author: Zhang Zhichao <441586...@qq.com> Authored: Sat Jan 27 00:03:19 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Mar 3 18:04:28 2018 +0530 ---------------------------------------------------------------------- .../CarbonBatchSparkStreamingExample.scala | 6 +- .../CarbonStreamSparkStreamingExample.scala | 218 +++++++++++++++++++ ...CarbonStructuredStreamingWithRowParser.scala | 2 +- integration/spark2/pom.xml | 6 + .../spark/sql/CarbonSparkStreamingFactory.scala | 60 +++++ .../TestStreamingTableWithRowParser.scala | 2 +- streaming/pom.xml | 6 + .../streaming/parser/CarbonStreamParser.java | 3 + .../CarbonSparkStreamingListener.scala | 31 +++ .../streaming/CarbonStreamSparkStreaming.scala | 187 ++++++++++++++++ 10 files changed, 514 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala index 6ae87b9..ef4dbce 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonBatchSparkStreamingExample.scala @@ -167,15 +167,11 @@ object CarbonBatchSparkStreamingExample { .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { - val df = SparkSession.builder().getOrCreate() - .createDataFrame(rdd).toDF("id", "name", "city", "salary") + val df = spark.createDataFrame(rdd).toDF("id", "name", "city", "salary") println("at time: " + time.toString() + " the count of received data: " + df.count()) df.write .format("carbondata") .option("tableName", tableName) - .option("tempCSV", "false") - .option("compress", "true") - .option("single_pass", "true") .mode(SaveMode.Append) .save() }} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala new file mode 100644 index 0000000..f59a610 --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.{File, PrintWriter} +import java.net.ServerSocket + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.CarbonEnv +import org.apache.spark.sql.CarbonSparkStreamingFactory +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.{Seconds, StreamingContext, Time} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.streaming.CarbonSparkStreamingListener +import org.apache.carbondata.streaming.parser.CarbonStreamParser + +/** + * This example introduces how to use Spark Streaming to write data + * to CarbonData stream table. + * + * NOTE: Current integration with Spark Streaming is an alpha feature. + */ +// scalastyle:off println +object CarbonStreamSparkStreamingExample { + + def main(args: Array[String]): Unit = { + + // setup paths + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val checkpointPath = + s"$rootPath/examples/spark2/target/spark_streaming_cp_" + + System.currentTimeMillis().toString() + val streamTableName = s"dstream_stream_table" + + val spark = ExampleUtils.createCarbonSession("CarbonStreamSparkStreamingExample", 4) + + val requireCreateTable = true + + if (requireCreateTable) { + // drop table if exists previously + spark.sql(s"DROP TABLE IF EXISTS ${ streamTableName }") + // Create target carbon table and populate with initial data + spark.sql( + s""" + | CREATE TABLE ${ streamTableName }( + | id INT, + | name STRING, + | city STRING, + | salary FLOAT + | ) + | STORED BY 'carbondata' + | TBLPROPERTIES( + | 'streaming'='true', + | 'sort_columns'='name', + | 'dictionary_include'='city') + | """.stripMargin) + val carbonTable = CarbonEnv.getCarbonTable(Some("default"), streamTableName)(spark) + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + // batch load + val path = s"$rootPath/examples/spark2/src/main/resources/streamSample.csv" + spark.sql( + s""" + | LOAD DATA LOCAL INPATH '$path' + | INTO TABLE $streamTableName + | OPTIONS('HEADER'='true') + """.stripMargin) + + // streaming ingest + val serverSocket = new ServerSocket(7071) + val thread1 = writeSocket(serverSocket) + val thread2 = showTableCount(spark, streamTableName) + val ssc = startStreaming(spark, streamTableName, tablePath, checkpointPath) + // add a Spark Streaming Listener to remove all lock for stream tables when stop app + ssc.sparkContext.addSparkListener(new CarbonSparkStreamingListener()) + // wait for stop signal to stop Spark Streaming App + waitForStopSignal(ssc) + // it need to start Spark Streaming App in main thread + // otherwise it will encounter an not-serializable exception. + ssc.start() + ssc.awaitTermination() + thread1.interrupt() + thread2.interrupt() + serverSocket.close() + } + + spark.sql(s"select count(*) from ${ streamTableName }").show(100, truncate = false) + + spark.sql(s"select * from ${ streamTableName } order by id desc").show(100, truncate = false) + + // record(id = 100000001) comes from batch segment_0 + // record(id = 1) comes from stream segment_1 + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id = 100000001 or id = 1 limit 100").show(100, truncate = false) + + // not filter + spark.sql(s"select * " + + s"from ${ streamTableName } " + + s"where id < 10 limit 100").show(100, truncate = false) + + // show segments + spark.sql(s"SHOW SEGMENTS FOR TABLE ${streamTableName}").show(false) + + spark.stop() + System.out.println("streaming finished") + } + + def showTableCount(spark: SparkSession, tableName: String): Thread = { + val thread = new Thread() { + override def run(): Unit = { + for (_ <- 0 to 1000) { + println(System.currentTimeMillis()) + spark.sql(s"select count(*) from $tableName").show(truncate = false) + spark.sql(s"SHOW SEGMENTS FOR TABLE ${tableName}").show(false) + Thread.sleep(1000 * 5) + } + } + } + thread.start() + thread + } + + def waitForStopSignal(ssc: StreamingContext): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // use command 'nc 127.0.0.1 7072' to stop Spark Streaming App + new ServerSocket(7072).accept() + // don't stop SparkContext here + ssc.stop(false, true) + } + } + thread.start() + thread + } + + def startStreaming(spark: SparkSession, tableName: String, + tablePath: CarbonTablePath, checkpointPath: String): StreamingContext = { + var ssc: StreamingContext = null + try { + // recommend: the batch interval must set larger, such as 30s, 1min. + ssc = new StreamingContext(spark.sparkContext, Seconds(30)) + ssc.checkpoint(checkpointPath) + + val readSocketDF = ssc.socketTextStream("localhost", 7071) + + val batchData = readSocketDF + .map(_.split(",")) + .map(fields => DStreamData(fields(0).toInt, fields(1), fields(2), fields(3).toFloat)) + + println("init carbon table info") + batchData.foreachRDD { (rdd: RDD[DStreamData], time: Time) => { + val df = spark.createDataFrame(rdd).toDF() + println(System.currentTimeMillis().toString() + + " at batch time: " + time.toString() + + " the count of received data: " + df.count()) + CarbonSparkStreamingFactory.getStreamSparkStreamingWriter(spark, "default", tableName) + .option(CarbonStreamParser.CARBON_STREAM_PARSER, + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) + .mode(SaveMode.Append) + .writeStreamData(df, time) + }} + } catch { + case ex: Exception => + ex.printStackTrace() + println("Done reading and writing streaming data") + } + ssc + } + + def writeSocket(serverSocket: ServerSocket): Thread = { + val thread = new Thread() { + override def run(): Unit = { + // wait for client to connection request and accept + val clientSocket = serverSocket.accept() + val socketWriter = new PrintWriter(clientSocket.getOutputStream()) + var index = 0 + for (_ <- 1 to 1000) { + // write 5 records per iteration + for (_ <- 0 to 100) { + index = index + 1 + socketWriter.println(index.toString + ",name_" + index + + ",city_" + index + "," + (index * 10000.00).toString + + ",school_" + index + ":school_" + index + index + "$" + index) + } + socketWriter.flush() + Thread.sleep(2000) + } + socketWriter.close() + System.out.println("Socket closed") + } + } + thread.start() + thread + } +} +// scalastyle:on println http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala index f134a8d..cce833b 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStructuredStreamingWithRowParser.scala @@ -171,7 +171,7 @@ object CarbonStructuredStreamingWithRowParser { .option("dbName", "default") .option("tableName", "stream_table_with_row_parser") .option(CarbonStreamParser.CARBON_STREAM_PARSER, - "org.apache.carbondata.streaming.parser.RowStreamParserImp") + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) .start() qry.awaitTermination() http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 9ac240b..90a5891 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -48,6 +48,12 @@ <artifactId>spark-repl_${scala.binary.version}</artifactId> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>${spark.deps.scope}</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala new file mode 100644 index 0000000..15b038b --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSparkStreamingFactory.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.commons.lang3.StringUtils + +import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.CarbonStreamSparkStreaming +import org.apache.carbondata.streaming.CarbonStreamSparkStreamingWriter + +/** + * Create [[CarbonStreamSparkStreamingWriter]] for stream table + * when integrate with Spark Streaming. + * + * NOTE: Current integration with Spark Streaming is an alpha feature. + */ +object CarbonSparkStreamingFactory { + + def getStreamSparkStreamingWriter(spark: SparkSession, + dbNameStr: String, + tableName: String): CarbonStreamSparkStreamingWriter = + synchronized { + val dbName = if (StringUtils.isEmpty(dbNameStr)) "default" else dbNameStr + val key = dbName + "." + tableName + if (CarbonStreamSparkStreaming.getTableMap.containsKey(key)) { + CarbonStreamSparkStreaming.getTableMap.get(key) + } else { + if (StringUtils.isEmpty(tableName) || tableName.contains(" ")) { + throw new CarbonStreamException("Table creation failed. " + + "Table name must not be blank or " + + "cannot contain blank space") + } + val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), + tableName)(spark) + if (!carbonTable.isStreamingTable) { + throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." + + s"${carbonTable.getTableName} is not a streaming table") + } + val streamWriter = new CarbonStreamSparkStreamingWriter(spark, + carbonTable, spark.sessionState.newHadoopConf()) + CarbonStreamSparkStreaming.getTableMap.put(key, streamWriter) + streamWriter + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index a3df2be..3e3b2c5 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -784,7 +784,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { .option("timestampformat", CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff) .option(CarbonStreamParser.CARBON_STREAM_PARSER, - "org.apache.carbondata.streaming.parser.RowStreamParserImp") + CarbonStreamParser.CARBON_STREAM_PARSER_ROW_PARSER) .start() qry.awaitTermination() } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/streaming/pom.xml b/streaming/pom.xml index 40e3d33..1d4dc7f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -24,6 +24,12 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>${spark.deps.scope}</scope> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java index 643758c..e335626 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/parser/CarbonStreamParser.java @@ -31,6 +31,9 @@ public interface CarbonStreamParser { String CARBON_STREAM_PARSER_DEFAULT = "org.apache.carbondata.streaming.parser.CSVStreamParserImp"; + String CARBON_STREAM_PARSER_ROW_PARSER = + "org.apache.carbondata.streaming.parser.RowStreamParserImp"; + void initialize(Configuration configuration, StructType structType); Object[] parserRow(InternalRow value); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala new file mode 100644 index 0000000..6d1fa45 --- /dev/null +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonSparkStreamingListener.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.scheduler.SparkListenerApplicationEnd + +class CarbonSparkStreamingListener extends SparkListener { + + /** + * When Spark Streaming App stops, remove all locks for stream table. + */ + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + CarbonStreamSparkStreaming.cleanAllLockAfterStop() + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6bb5a2b0/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala new file mode 100644 index 0000000..4aa1517 --- /dev/null +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.streaming + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink +import org.apache.spark.sql.execution.streaming.Sink +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.SparkSession +import org.apache.spark.streaming.Time + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable + +/** + * Interface used to write stream data to stream table + * when integrate with Spark Streaming. + * + * NOTE: Current integration with Spark Streaming is an alpha feature. + */ +class CarbonStreamSparkStreamingWriter(val sparkSession: SparkSession, + val carbonTable: CarbonTable, + val configuration: Configuration) { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + private var isInitialize: Boolean = false + + private var lock: ICarbonLock = null + private var carbonAppendableStreamSink: Sink = null + + /** + * Acquired the lock for stream table + */ + def lockStreamTable(): Unit = { + lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the lock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } else { + LOGGER.error("Not able to acquire the lock for stream table:" + + carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new InterruptedException( + "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + /** + * unlock for stream table + */ + def unLockStreamTable(): Unit = { + if (null != lock) { + lock.unlock() + LOGGER.info("unlock for stream table: " + + carbonTable.getDatabaseName + "." + + carbonTable.getTableName) + } + } + + def initialize(): Unit = { + carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink( + sparkSession, + configuration, + carbonTable, + extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink] + + lockStreamTable() + + isInitialize = true + } + + def writeStreamData(dataFrame: DataFrame, time: Time): Unit = { + if (!isInitialize) { + initialize() + } + carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame) + } + + private val extraOptions = new scala.collection.mutable.HashMap[String, String] + private var mode: SaveMode = SaveMode.ErrorIfExists + + this.option("dbName", carbonTable.getDatabaseName) + this.option("tableName", carbonTable.getTableName) + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `SaveMode.Overwrite`: overwrite the existing data. + * - `SaveMode.Append`: append the data. + * - `SaveMode.Ignore`: ignore the operation (i.e. no-op). + * - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime. + */ + def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode + } + this + } + + /** + * Specifies the behavior when data or table already exists. Options include: + * - `overwrite`: overwrite the existing data. + * - `append`: append the data. + * - `ignore`: ignore the operation (i.e. no-op). + * - `error or default`: default option, throw an exception at runtime. + */ + def mode(saveMode: String): CarbonStreamSparkStreamingWriter = { + if (mode == SaveMode.ErrorIfExists) { + mode = saveMode.toLowerCase(util.Locale.ROOT) match { + case "overwrite" => SaveMode.Overwrite + case "append" => SaveMode.Append + case "ignore" => SaveMode.Ignore + case "error" | "default" => SaveMode.ErrorIfExists + case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " + + "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.") + } + } + this + } + + /** + * Adds an output option + */ + def option(key: String, value: String): CarbonStreamSparkStreamingWriter = { + if (!extraOptions.contains(key)) { + extraOptions += (key -> value) + } + this + } + + /** + * Adds an output option + */ + def option(key: String, value: Boolean): CarbonStreamSparkStreamingWriter = + option(key, value.toString) + + /** + * Adds an output option + */ + def option(key: String, value: Long): CarbonStreamSparkStreamingWriter = + option(key, value.toString) + + /** + * Adds an output option + */ + def option(key: String, value: Double): CarbonStreamSparkStreamingWriter = + option(key, value.toString) +} + +object CarbonStreamSparkStreaming { + + @transient private val tableMap = + new util.HashMap[String, CarbonStreamSparkStreamingWriter]() + + def getTableMap: util.Map[String, CarbonStreamSparkStreamingWriter] = tableMap + + /** + * remove all stream lock. + */ + def cleanAllLockAfterStop(): Unit = { + tableMap.asScala.values.foreach { writer => writer.unLockStreamTable() } + tableMap.clear() + } +}