[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-18 Thread ajithme
Github user ajithme closed the pull request at:

https://github.com/apache/carbondata/pull/2495


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-12 Thread ajithme
Github user ajithme commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2495#discussion_r202232048
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
@@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
 val df = sparkSession.sql(query)
 var sourceTable: CarbonTable = null
+var dataFrame: Option[DataFrame] = None
 
 // find the streaming source table in the query
 // and replace it with StreamingRelation
-val streamLp = df.logicalPlan transform {
+df.logicalPlan transform {
   case r: LogicalRelation
 if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&

r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
-val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
--- End diff --

Added method comments


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2495#discussion_r20509
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
 ---
@@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
 val df = sparkSession.sql(query)
 var sourceTable: CarbonTable = null
+var dataFrame: Option[DataFrame] = None
 
 // find the streaming source table in the query
 // and replace it with StreamingRelation
-val streamLp = df.logicalPlan transform {
+df.logicalPlan transform {
   case r: LogicalRelation
 if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&

r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource
 =>
-val (source, streamingRelation) = 
prepareStreamingRelation(sparkSession, r)
+val (source, resolvedFrame) = prepareDataFrame(sparkSession, r)
--- End diff --

please add comment here to describe what is done inside prepareDataFrame


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2495#discussion_r20321
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -102,14 +104,23 @@ object StreamJobManager {
 }
 
 validateSourceTable(sourceTable)
-validateSinkTable(streamDf.schema, sinkTable)
+
+// kafka surce always have fixed schema, need to get actual schema
+val isKafka = 
Option(sourceTable.getTableInfo.getFactTable.getTableProperties
--- End diff --

Please add a function in CarbonTable to get the underlying format


---


[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...

2018-07-12 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2495#discussion_r20238
  
--- Diff: 
integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
 ---
@@ -58,14 +58,16 @@ object StreamJobManager {
 "streaming sink table " +
 "('streaming' tblproperty 
is not 'sink' or 'true')")
 }
+// TODO: validate query schema against sink ( as in kafka we cannot 
get schema directly)
+/*
--- End diff --

can we move this validation before, so that we still do validation for 
non-kafka format?


---