[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user ajithme closed the pull request at: https://github.com/apache/carbondata/pull/2495 ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user ajithme commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2495#discussion_r202232048 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null +var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation -val streamLp = df.logicalPlan transform { +df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => -val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) +val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) --- End diff -- Added method comments ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2495#discussion_r20509 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala --- @@ -53,20 +52,22 @@ case class CarbonCreateStreamCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val df = sparkSession.sql(query) var sourceTable: CarbonTable = null +var dataFrame: Option[DataFrame] = None // find the streaming source table in the query // and replace it with StreamingRelation -val streamLp = df.logicalPlan transform { +df.logicalPlan transform { case r: LogicalRelation if r.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && r.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.isStreamingSource => -val (source, streamingRelation) = prepareStreamingRelation(sparkSession, r) +val (source, resolvedFrame) = prepareDataFrame(sparkSession, r) --- End diff -- please add comment here to describe what is done inside prepareDataFrame ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2495#discussion_r20321 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -102,14 +104,23 @@ object StreamJobManager { } validateSourceTable(sourceTable) -validateSinkTable(streamDf.schema, sinkTable) + +// kafka surce always have fixed schema, need to get actual schema +val isKafka = Option(sourceTable.getTableInfo.getFactTable.getTableProperties --- End diff -- Please add a function in CarbonTable to get the underlying format ---
[GitHub] carbondata pull request #2495: [CARBONDATA-2736][CARBONSTORE] Kafka integrat...
Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2495#discussion_r20238 --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala --- @@ -58,14 +58,16 @@ object StreamJobManager { "streaming sink table " + "('streaming' tblproperty is not 'sink' or 'true')") } +// TODO: validate query schema against sink ( as in kafka we cannot get schema directly) +/* --- End diff -- can we move this validation before, so that we still do validation for non-kafka format? ---