carbondata git commit: [CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment
Repository: carbondata Updated Branches: refs/heads/branch-1.3 167260da8 -> da0cb4f6a [CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment At the begin of each micro batch, check the status of current segment.if the status is streaming, continue to use this segment if the status is streaming finish, open new streaming segment to accept new streaming data This closes #2163 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da0cb4f6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da0cb4f6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da0cb4f6 Branch: refs/heads/branch-1.3 Commit: da0cb4f6aa1e73bf0961b86d087160f55ee3d696 Parents: 167260d Author: QiangCaiAuthored: Tue Apr 3 14:32:59 2018 +0800 Committer: manishgupta88 Committed: Thu Apr 12 14:29:55 2018 +0530 -- docs/streaming-guide.md | 2 +- .../streaming/StreamSinkFactory.scala | 35 ++ .../streaming/CarbonAppendableStreamSink.scala | 38 +--- .../CarbonStreamingQueryListener.scala | 27 -- 4 files changed, 68 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/docs/streaming-guide.md -- diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md index aa9eaef..3ea2881 100644 --- a/docs/streaming-guide.md +++ b/docs/streaming-guide.md @@ -133,7 +133,7 @@ streaming | The segment is running streaming ingestion streaming finish | The segment already finished streaming ingestion, it will be handed off to a segment in the columnar format ## Change segment status -Use below command to change the status of "streaming" segment to "streaming finish" segment. +Use below command to change the status of "streaming" segment to "streaming finish" segment. If the streaming application is running, this command will be blocked. ```sql ALTER TABLE streaming_table FINISH STREAMING ``` http://git-wip-us.apache.org/repos/asf/carbondata/blob/da0cb4f6/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala -- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 3366f51..aded292 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -17,6 +17,9 @@ package org.apache.carbondata.streaming +import java.io.IOException +import java.util + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -24,10 +27,12 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.streaming.{CarbonAppendableStreamSink, Sink} +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties @@ -45,11 +50,41 @@ import org.apache.carbondata.streaming.segment.StreamSegment */ object StreamSinkFactory { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + val locks = new util.concurrent.ConcurrentHashMap[String, ICarbonLock]() + + def lock(carbonTable: CarbonTable): Unit = { +val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) +if (lock.lockWithRetries()) { + locks.put(carbonTable.getTableUniqueName, lock) + LOGGER.info("Acquired the streaming lock for stream table: " + carbonTable.getDatabaseName + + "." + carbonTable.getTableName) +} else { + LOGGER.error("Not able to acquire the streaming lock for stream table:" + +carbonTable.getDatabaseName + "." + carbonTable.getTableName) + throw new IOException( +"Not able to acquire the streaming lock for stream table: " + +carbonTable.getDatabaseName + "." +
carbondata git commit: [CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment
Repository: carbondata Updated Branches: refs/heads/master 94ea913a0 -> cfb9a9a20 [CARBONDATA-2311][Streaming] Fix bug to avoid to append data to streaming finish segment At the begin of each micro batch, check the status of current segment.if the status is streaming, continue to use this segment if the status is streaming finish, open new streaming segment to accept new streaming data This closes #2135 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cfb9a9a2 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cfb9a9a2 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cfb9a9a2 Branch: refs/heads/master Commit: cfb9a9a2029db3c5c25b96955518ded411be8f5b Parents: 94ea913 Author: QiangCaiAuthored: Tue Apr 3 14:32:59 2018 +0800 Committer: ravipesala Committed: Tue Apr 10 08:24:27 2018 +0530 -- docs/streaming-guide.md | 2 +- .../CarbonStreamingQueryListener.scala | 24 +++- .../streaming/StreamSinkFactory.scala | 40 ++- .../streaming/CarbonAppendableStreamSink.scala | 41 +--- 4 files changed, 71 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/docs/streaming-guide.md -- diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md index aa9eaef..3ea2881 100644 --- a/docs/streaming-guide.md +++ b/docs/streaming-guide.md @@ -133,7 +133,7 @@ streaming | The segment is running streaming ingestion streaming finish | The segment already finished streaming ingestion, it will be handed off to a segment in the columnar format ## Change segment status -Use below command to change the status of "streaming" segment to "streaming finish" segment. +Use below command to change the status of "streaming" segment to "streaming finish" segment. If the streaming application is running, this command will be blocked. ```sql ALTER TABLE streaming_table FINISH STREAMING ``` http://git-wip-us.apache.org/repos/asf/carbondata/blob/cfb9a9a2/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala -- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala index 6d83fad..ebb1a41 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/streaming/CarbonStreamingQueryListener.scala @@ -31,7 +31,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private val cache = new util.HashMap[UUID, ICarbonLock]() + private val cache = new util.HashMap[UUID, String]() override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { val streamQuery = spark.streams.get(event.id) @@ -48,19 +48,7 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi LOGGER.info("Carbon streaming query started: " + event.id) val sink = qry.sink.asInstanceOf[CarbonAppendableStreamSink] val carbonTable = sink.carbonTable - val lock = CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, -LockUsage.STREAMING_LOCK) - if (lock.lockWithRetries()) { -LOGGER.info("Acquired the lock for stream table: " + carbonTable.getDatabaseName + "." + -carbonTable.getTableName) -cache.put(event.id, lock) - } else { -LOGGER.error("Not able to acquire the lock for stream table:" + - carbonTable.getDatabaseName + "." + carbonTable.getTableName) -throw new InterruptedException( - "Not able to acquire the lock for stream table: " + carbonTable.getDatabaseName + "." + - carbonTable.getTableName) - } + cache.put(event.id, carbonTable.getTableUniqueName) } } @@ -68,10 +56,10 @@ class CarbonStreamingQueryListener(spark: SparkSession) extends StreamingQueryLi } override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { -val lock = cache.remove(event.id) -if (null != lock) { - LOGGER.info("Carbon streaming query: " + event.id) - lock.unlock() +val tableUniqueName = cache.remove(event.id) +if (null != tableUniqueName) { + LOGGER.info("Carbon