[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/carbondata/pull/2328


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196445223
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -1642,7 +1638,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 var rows = sql("SHOW STREAMS").collect()
 assertResult(0)(rows.length)
 
-val csvDataDir = new File("target/csvdata").getCanonicalPath
+val csvDataDir = integrationPath + "/target/csvdata"
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196445328
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -123,12 +123,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 
 createTable(tableName = "agg_table", streaming = true, withBatchLoad = 
false)
 
-var csvDataDir = new File("target/csvdatanew").getCanonicalPath
+val csvDataDir = integrationPath + "target/csvdatanew"
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196444812
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -195,7 +192,6 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 sql("USE default")
 sql("DROP DATABASE IF EXISTS streaming CASCADE")
 new File("target/csvdatanew").delete()
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196444770
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -123,12 +123,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 
 createTable(tableName = "agg_table", streaming = true, withBatchLoad = 
false)
 
-var csvDataDir = new File("target/csvdatanew").getCanonicalPath
+val csvDataDir = integrationPath + "target/csvdatanew"
 generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
 generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, 
SaveMode.Append)
-csvDataDir = new File("target/csvdata").getCanonicalPath
-// streaming ingest 10 rows
-generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
--- End diff --

I added it in individual test case


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196350452
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -195,7 +192,6 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 sql("USE default")
 sql("DROP DATABASE IF EXISTS streaming CASCADE")
 new File("target/csvdatanew").delete()
--- End diff --

it should be integrationPath + "/spark2/target/csvdatanew


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196350482
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -1642,7 +1638,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 var rows = sql("SHOW STREAMS").collect()
 assertResult(0)(rows.length)
 
-val csvDataDir = new File("target/csvdata").getCanonicalPath
+val csvDataDir = integrationPath + "/target/csvdata"
--- End diff --

it should be integrationPath + "/spark2/target/csvdatanew


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196350296
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -123,12 +123,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 
 createTable(tableName = "agg_table", streaming = true, withBatchLoad = 
false)
 
-var csvDataDir = new File("target/csvdatanew").getCanonicalPath
+val csvDataDir = integrationPath + "target/csvdatanew"
 generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir)
 generateCSVDataFile(spark, idStart = 10, rowNums = 5, csvDataDir, 
SaveMode.Append)
-csvDataDir = new File("target/csvdata").getCanonicalPath
-// streaming ingest 10 rows
-generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir)
--- End diff --

why is this removed?


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-19 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196350110
  
--- Diff: 
integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 ---
@@ -123,12 +123,9 @@ class TestStreamingTableOperation extends QueryTest 
with BeforeAndAfterAll {
 
 createTable(tableName = "agg_table", streaming = true, withBatchLoad = 
false)
 
-var csvDataDir = new File("target/csvdatanew").getCanonicalPath
+val csvDataDir = integrationPath + "target/csvdatanew"
--- End diff --

it should be `integrationPath + "/spark2/target/csvdatanew`


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196137795
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.streaming.DataStreamReader
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * This command will start a Spark streaming job to insert rows from 
source to sink
+ */
+case class CarbonCreateStreamCommand(
+streamName: String,
+sinkDbName: Option[String],
+sinkTableName: String,
+optionMap: Map[String, String],
+query: String
+) extends DataCommand {
+
+  override def output: Seq[Attribute] =
+Seq(AttributeReference("Stream Name", StringType, nullable = false)(),
+  AttributeReference("JobId", StringType, nullable = false)(),
+  AttributeReference("Status", StringType, nullable = false)())
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+val df = sparkSession.sql(query)
+var sourceTable: CarbonTable = null
+
+// find the streaming source table in the query
+// and replace it with StreamingRelation
+val streamLp = df.logicalPlan transform {
+  case r: LogicalRelation
+if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+   
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
+val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+if (sourceTable != null && sourceTable.getTableName != 
source.getTableName) {
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196132442
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM streamName ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+(CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ 
ident ~
+(STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+(AS ~> restInput) <~ opt(";") ^^ {
+  case streamName ~ dbName ~ tableName ~ options ~ query =>
+val optionMap = options.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamCommand(streamName, dbName, tableName, 
optionMap, query)
+}
+
+  /**
+   * The syntax of DROP STREAM
+   * DROP STREAM streamName
+   */
+  protected lazy val dropStream: Parser[LogicalPlan] =
+DROP ~> STREAM ~> ident <~ opt(";") ^^ {
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196132467
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM streamName ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+(CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ 
ident ~
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196089874
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+object StreamJobManager {
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196089005
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // map of stream name to job desc
+  private val jobs = new ConcurrentHashMap[String, StreamJobDesc]()
+
+  private def validateStreamName(streamName: String): Unit = {
+if 
(StreamJobManager.getAllJobs.exists(_.streamName.equalsIgnoreCase(streamName))) 
{
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196087809
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class StreamingOption(val userInputMap: Map[String, String]) {
+  def trigger: Trigger = {
+val trigger = userInputMap.getOrElse(
+  "trigger", throw new MalformedCarbonCommandException("trigger must 
be specified"))
+val interval = userInputMap.getOrElse(
+  "interval", throw new MalformedCarbonCommandException("interval must 
be specified"))
+trigger match {
+  case "ProcessingTime" => ProcessingTime(interval)
--- End diff --

but it is still there in spark 2.1.0


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196087279
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class StreamingOption(val userInputMap: Map[String, String]) {
+  def trigger: Trigger = {
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-18 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r196086445
  
--- Diff: 
integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
 ---
@@ -171,4 +176,21 @@ public Object wrapWithGenericRow(Object[] fields) {
 }
 return fields;
   }
+
+  public static StructType convertToSparkSchema(ColumnSchema[] 
carbonColumns) {
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195677693
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.streaming.DataStreamReader
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * This command will start a Spark streaming job to insert rows from 
source to sink
+ */
+case class CarbonCreateStreamCommand(
+streamName: String,
+sinkDbName: Option[String],
+sinkTableName: String,
+optionMap: Map[String, String],
+query: String
+) extends DataCommand {
+
+  override def output: Seq[Attribute] =
+Seq(AttributeReference("Stream Name", StringType, nullable = false)(),
+  AttributeReference("JobId", StringType, nullable = false)(),
+  AttributeReference("Status", StringType, nullable = false)())
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+val df = sparkSession.sql(query)
+var sourceTable: CarbonTable = null
+
+// find the streaming source table in the query
+// and replace it with StreamingRelation
+val streamLp = df.logicalPlan transform {
+  case r: LogicalRelation
+if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+   
r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
+val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+if (sourceTable != null && sourceTable.getTableName != 
source.getTableName) {
--- End diff --

I don't get the logic here, You want to verify that more than one should 
not be present here? Then better update the error to `Stream query on more than 
one stream source table is not supported`


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195673232
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM streamName ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+(CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ 
ident ~
--- End diff --

Better support `if not exist` also


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195673124
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,41 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM streamName ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+(CREATE ~> STREAM ~> ident) ~ (ON ~> TABLE ~> (ident <~ ".").?) ~ 
ident ~
+(STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+(AS ~> restInput) <~ opt(";") ^^ {
+  case streamName ~ dbName ~ tableName ~ options ~ query =>
+val optionMap = options.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamCommand(streamName, dbName, tableName, 
optionMap, query)
+}
+
+  /**
+   * The syntax of DROP STREAM
+   * DROP STREAM streamName
+   */
+  protected lazy val dropStream: Parser[LogicalPlan] =
+DROP ~> STREAM ~> ident <~ opt(";") ^^ {
--- End diff --

Better support `if exist` also


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195669110
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  // map of stream name to job desc
+  private val jobs = new ConcurrentHashMap[String, StreamJobDesc]()
+
+  private def validateStreamName(streamName: String): Unit = {
+if 
(StreamJobManager.getAllJobs.exists(_.streamName.equalsIgnoreCase(streamName))) 
{
--- End diff --

I guess just checking `jobs.containsKey(streamName)` should be enough.


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195668205
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import org.apache.carbondata.common.exceptions.NoSuchStreamException
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+import org.apache.carbondata.streaming.CarbonStreamException
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+object StreamJobManager {
--- End diff --

Add description to the class. And also better mention that this stream job 
is only available to the driver memory and not persisted so other drivers 
cannot see ongoing stream jobs.


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195666269
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class StreamingOption(val userInputMap: Map[String, String]) {
+  def trigger: Trigger = {
+val trigger = userInputMap.getOrElse(
+  "trigger", throw new MalformedCarbonCommandException("trigger must 
be specified"))
+val interval = userInputMap.getOrElse(
+  "interval", throw new MalformedCarbonCommandException("interval must 
be specified"))
+trigger match {
+  case "ProcessingTime" => ProcessingTime(interval)
--- End diff --

`ProcessingTime is deprecated, suggested using `Trigger.ProcessingTime`


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195666045
  
--- Diff: 
integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
 ---
@@ -171,4 +176,21 @@ public Object wrapWithGenericRow(Object[] fields) {
 }
 return fields;
   }
+
+  public static StructType convertToSparkSchema(ColumnSchema[] 
carbonColumns) {
--- End diff --

I think this method do not handle complex datatype schema. Can you check 
`CarbonRelation` how we are handling complex schema and same way can be used 
here


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-15 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r195666160
  
--- Diff: 
integration/spark-common/src/main/scala/org/apache/carbondata/spark/StreamingOption.scala
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.streaming.parser.CarbonStreamParser
+
+class StreamingOption(val userInputMap: Map[String, String]) {
+  def trigger: Trigger = {
--- End diff --

Ideally, these all could be lazy val instead of def


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r194666440
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Field, MetadataCommand, 
TableNewProcessor}
+import 
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+/**
+ * This command is used to create Stream Source, which is implemented as a 
Carbon Table
+ */
+case class CarbonCreateStreamSourceCommand(
+dbName: Option[String],
+tableName: String,
+fields: Seq[Field],
+tblProperties: Map[String, String]
+) extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
--- End diff --

1. ok


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r194665653
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM SOURCE
+   * CREATE STREAM SOURCE [dbName.]tableName (schema list)
+   * [TBLPROPERTIES('KEY'='VALUE')]
+   */
+  protected lazy val createStreamSource: Parser[LogicalPlan] =
+CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~
+("(" ~> repsep(anyFieldDef, ",") <~ ")") ~
+(TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ 
opt(";") ^^ {
+  case dbName ~ tableName ~ fields ~ map =>
+val tblProperties = map.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamSourceCommand(dbName, tableName, fields, 
tblProperties)
+}
+
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~
+(STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+(AS ~> restInput) <~ opt(";") ^^ {
+  case dbName ~ tableName ~ options ~ query =>
+val optionMap = options.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamCommand(dbName, tableName, optionMap, query)
+}
+
+  /**
+   * The syntax of KILL STREAM
+   * KILL STREAM ON TABLE [dbName].tableName
+   */
+  protected lazy val killStream: Parser[LogicalPlan] =
--- End diff --

If the stream is dropped, user need to trigger CREATE STREAM again


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r194651389
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Field, MetadataCommand, 
TableNewProcessor}
+import 
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+/**
+ * This command is used to create Stream Source, which is implemented as a 
Carbon Table
+ */
+case class CarbonCreateStreamSourceCommand(
+dbName: Option[String],
+tableName: String,
+fields: Seq[Field],
+tblProperties: Map[String, String]
+) extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
+  ifNotExistPresent = false,
+  dbName,
+  tableName,
+  fields,
+  Seq.empty,
+  mutable.Map[String, String](tblProperties.toSeq: _*),
+  None
+)
+val tableInfo = TableNewProcessor.apply(tableModel)
--- End diff --

Create Stream Source is removed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r194651326
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.streaming.CarbonStreamException
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val jobs = mutable.Map[String, StreamJobDesc]()
+
+  /**
+   * Start a spark streaming query
+   * @param sparkSession session instance
+   * @param sourceTable stream source table
+   * @param sinkTable sink table to insert to
+   * @param query query string
+   * @param streamDf dataframe that containing the query from stream 
source table
+   * @param options options provided by user
+   * @return Job ID
+   */
+  def startJob(
+  sparkSession: SparkSession,
+  sourceTable: CarbonTable,
+  sinkTable: CarbonTable,
+  query: String,
+  streamDf: DataFrame,
+  options: StreamingOption): String = {
+val latch = new CountDownLatch(1)
+var exception: Throwable = null
+var job: StreamingQuery = null
+
+// start a new thread to run the streaming ingest job, the job will be 
running
+// until user stops it by STOP STREAM JOB
--- End diff --

fixed


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-06-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r194650822
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * Show all streams created or on a specified table
+ */
+case class CarbonShowStreamsCommand(
+tableOp: Option[TableIdentifier]
+) extends MetadataCommand {
+  override def output: Seq[Attribute] = {
+Seq(AttributeReference("JobId", StringType, nullable = false)(),
+  AttributeReference("Status", StringType, nullable = false)(),
+  AttributeReference("Source", StringType, nullable = false)(),
+  AttributeReference("Sink", StringType, nullable = false)(),
+  AttributeReference("Start Time", StringType, nullable = false)(),
+  AttributeReference("Time Elapse", StringType, nullable = false)())
+  }
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val jobs = tableOp match {
+  case None => StreamJobManager.getAllJobs.toSeq
+  case Some(table) =>
+val carbonTable = CarbonEnv.getCarbonTable(table.database, 
table.table)(sparkSession)
+StreamJobManager.getAllJobs.filter { job =>
+  job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
+  job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
+}.toSeq
+}
+
+jobs.map { job =>
+  val elapsedTime = System.currentTimeMillis() - job.startTime
+  Row(
+job.streamingQuery.id.toString,
+if (job.streamingQuery.isActive) "RUNNING" else "FAILED",
+s"${ job.sourceDb }.${ job.sourceTable }",
+s"${ job.sinkDb }.${ job.sinkTable }",
+new Date(job.startTime).toString,
+String.format(
+  "%s days, %s hours, %s min, %s sec",
+  TimeUnit.MILLISECONDS.toDays(elapsedTime).toString,
+  TimeUnit.MILLISECONDS.toHours(elapsedTime).toString,
--- End diff --

ok


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189806362
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 ---
@@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
+import org.apache.spark.sql.CarbonEnv
--- End diff --

I will remove it


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-22 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189806161
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.streaming.CarbonStreamException
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val jobs = mutable.Map[String, StreamJobDesc]()
--- End diff --

ok


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189764133
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Field, MetadataCommand, 
TableNewProcessor}
+import 
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+/**
+ * This command is used to create Stream Source, which is implemented as a 
Carbon Table
+ */
+case class CarbonCreateStreamSourceCommand(
+dbName: Option[String],
+tableName: String,
+fields: Seq[Field],
+tblProperties: Map[String, String]
+) extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
+  ifNotExistPresent = false,
+  dbName,
+  tableName,
+  fields,
+  Seq.empty,
+  mutable.Map[String, String](tblProperties.toSeq: _*),
+  None
+)
+val tableInfo = TableNewProcessor.apply(tableModel)
--- End diff --

The `Create Stream Source` internally calls `Create Table`. What's the 
difference between them? If there is no difference, is `Create Stream Source` 
necessary? Can ordinary carbon table can be used as source?


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189762936
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
 ---
@@ -22,6 +22,7 @@ import scala.Array.canBuildFrom
 import scala.collection.JavaConverters._
 import scala.util.parsing.combinator.RegexParsers
 
+import org.apache.spark.sql.CarbonEnv
--- End diff --

?
Only add an import without changing the code?


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189763349
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.streaming.CarbonStreamException
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val jobs = mutable.Map[String, StreamJobDesc]()
+
+  /**
+   * Start a spark streaming query
+   * @param sparkSession session instance
+   * @param sourceTable stream source table
+   * @param sinkTable sink table to insert to
+   * @param query query string
+   * @param streamDf dataframe that containing the query from stream 
source table
+   * @param options options provided by user
+   * @return Job ID
+   */
+  def startJob(
+  sparkSession: SparkSession,
+  sourceTable: CarbonTable,
+  sinkTable: CarbonTable,
+  query: String,
+  streamDf: DataFrame,
+  options: StreamingOption): String = {
+val latch = new CountDownLatch(1)
+var exception: Throwable = null
+var job: StreamingQuery = null
+
+// start a new thread to run the streaming ingest job, the job will be 
running
+// until user stops it by STOP STREAM JOB
--- End diff --

outdated `STOP STREAM JOB`


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread xuchuanyin
Github user xuchuanyin commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189764478
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 ---
@@ -145,6 +149,55 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser 
{
 CarbonAlterTableFinishStreaming(dbName, table)
 }
 
+  /**
+   * The syntax of CREATE STREAM SOURCE
+   * CREATE STREAM SOURCE [dbName.]tableName (schema list)
+   * [TBLPROPERTIES('KEY'='VALUE')]
+   */
+  protected lazy val createStreamSource: Parser[LogicalPlan] =
+CREATE ~> STREAM ~> SOURCE ~> (ident <~ ".").? ~ ident ~
+("(" ~> repsep(anyFieldDef, ",") <~ ")") ~
+(TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ 
opt(";") ^^ {
+  case dbName ~ tableName ~ fields ~ map =>
+val tblProperties = map.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamSourceCommand(dbName, tableName, fields, 
tblProperties)
+}
+
+  /**
+   * The syntax of CREATE STREAM
+   * CREATE STREAM ON TABLE [dbName.]tableName
+   * [STMPROPERTIES('KEY'='VALUE')]
+   * AS SELECT COUNT(COL1) FROM tableName
+   */
+  protected lazy val createStream: Parser[LogicalPlan] =
+CREATE ~> STREAM ~> ON ~> TABLE ~> (ident <~ ".").? ~ ident ~
+(STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+(AS ~> restInput) <~ opt(";") ^^ {
+  case dbName ~ tableName ~ options ~ query =>
+val optionMap = options.getOrElse(List[(String, 
String)]()).toMap[String, String]
+CarbonCreateStreamCommand(dbName, tableName, optionMap, query)
+}
+
+  /**
+   * The syntax of KILL STREAM
+   * KILL STREAM ON TABLE [dbName].tableName
+   */
+  protected lazy val killStream: Parser[LogicalPlan] =
--- End diff --

What will happen if I call `Kill Stream`?
Should I (re)`create stream` if I want the stream start again? If so, 
better to change the grammar to `DROP STREAM ON TABLE ...`


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread gvramana
Github user gvramana commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189778004
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamSourceCommand.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Field, MetadataCommand, 
TableNewProcessor}
+import 
org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+/**
+ * This command is used to create Stream Source, which is implemented as a 
Carbon Table
+ */
+case class CarbonCreateStreamSourceCommand(
+dbName: Option[String],
+tableName: String,
+fields: Seq[Field],
+tblProperties: Map[String, String]
+) extends MetadataCommand {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val tableModel = new CarbonSpark2SqlParser().prepareTableModel(
--- End diff --

1) 'streaming'='source' need not be explicitly passed by user, for this ddl 
can internally always add.
2) select * on stream src Table should fail.
3) another ddl required to show stream source tables. As it is tough to 
identify streaming source tables from list of tables.
4) Stream sink table DDL mentioned in description not required as 
carbondata already supports streaming ingestion.


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread gvramana
Github user gvramana commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189615740
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.stream
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.execution.command.MetadataCommand
+import org.apache.spark.sql.types.StringType
+
+import org.apache.carbondata.stream.StreamJobManager
+
+/**
+ * Show all streams created or on a specified table
+ */
+case class CarbonShowStreamsCommand(
+tableOp: Option[TableIdentifier]
+) extends MetadataCommand {
+  override def output: Seq[Attribute] = {
+Seq(AttributeReference("JobId", StringType, nullable = false)(),
+  AttributeReference("Status", StringType, nullable = false)(),
+  AttributeReference("Source", StringType, nullable = false)(),
+  AttributeReference("Sink", StringType, nullable = false)(),
+  AttributeReference("Start Time", StringType, nullable = false)(),
+  AttributeReference("Time Elapse", StringType, nullable = false)())
+  }
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+val jobs = tableOp match {
+  case None => StreamJobManager.getAllJobs.toSeq
+  case Some(table) =>
+val carbonTable = CarbonEnv.getCarbonTable(table.database, 
table.table)(sparkSession)
+StreamJobManager.getAllJobs.filter { job =>
+  job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
+  job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
+}.toSeq
+}
+
+jobs.map { job =>
+  val elapsedTime = System.currentTimeMillis() - job.startTime
+  Row(
+job.streamingQuery.id.toString,
+if (job.streamingQuery.isActive) "RUNNING" else "FAILED",
+s"${ job.sourceDb }.${ job.sourceTable }",
+s"${ job.sinkDb }.${ job.sinkTable }",
+new Date(job.startTime).toString,
+String.format(
+  "%s days, %s hours, %s min, %s sec",
+  TimeUnit.MILLISECONDS.toDays(elapsedTime).toString,
+  TimeUnit.MILLISECONDS.toHours(elapsedTime).toString,
--- End diff --

toHours will give total hours elapsed
toMinutres will give total minutes elapsed.
It will not in format (10days, 1hours, 5min, 34 sec)


---


[GitHub] carbondata pull request #2328: [CARBONDATA-2504][STREAM] Support StreamSQL f...

2018-05-21 Thread gvramana
Github user gvramana commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2328#discussion_r189606530
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.stream
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.streaming.StreamingQuery
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.spark.StreamingOption
+import org.apache.carbondata.streaming.CarbonStreamException
+
+object StreamJobManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val jobs = mutable.Map[String, StreamJobDesc]()
--- End diff --

Need to use concurrent hash map


---