[ 
https://issues.apache.org/jira/browse/CARBONDATA-2002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Geetika Gupta closed CARBONDATA-2002.
-------------------------------------
    Resolution: Fixed

> Streaming segment status is not getting updated to finished or success
> ----------------------------------------------------------------------
>
>                 Key: CARBONDATA-2002
>                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2002
>             Project: CarbonData
>          Issue Type: Bug
>          Components: data-load
>    Affects Versions: 1.3.0
>         Environment: spark2.1
>            Reporter: Geetika Gupta
>            Priority: Major
>             Fix For: 1.4.0
>
>         Attachments: 2000_UniqData.csv
>
>
> I created a streaming table and loaded data into it using the following 
> commands on spark shell:
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.CarbonSession._
> import org.apache.carbondata.core.util.CarbonProperties
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
> val carbon = SparkSession.builder().config(sc.getConf) 
> .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
> import org.apache.carbondata.core.constants.CarbonCommonConstants
> import org.apache.carbondata.core.util.CarbonProperties
> CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
>  "FORCE")
> carbon.sql("drop table if exists uniqdata_stream")
> carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB 
> timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 
> bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
> decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
> int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES 
> ('TABLE_BLOCKSIZE'= '256 MB', 'streaming'='true')");
> import carbon.sqlContext.implicits._
> import org.apache.spark.sql.types._
> val uniqdataSch = StructType(
> Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", 
> StringType),StructField("DOB", TimestampType), StructField("DOJ", 
> TimestampType), StructField("BIGINT_COLUMN1", LongType), 
> StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", 
> org.apache.spark.sql.types.DecimalType(30, 10)), 
> StructField("DECIMAL_COLUMN2", 
> org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", 
> DoubleType), StructField("Double_COLUMN2", DoubleType), 
> StructField("INTEGER_COLUMN1", IntegerType)))
> val streamDf = carbon.readStream
> .schema(uniqdataSch)
> .option("sep", ",")
> .csv("file:///home/geetika/Downloads/uniqdata")
> val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 
> seconds"))
>              .option("checkpointLocation","/stream/uniq")
>             .option("dbName", "default")
>             .option("tableName", "uniqdata_stream")
>             .start()
>           qry.awaitTermination()
> //Press ctrl+c to terminate
> start the spark shell again
>  import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.CarbonSession._
> val carbon = SparkSession.builder().config(sc.getConf) 
> .getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
> carbon.sql("show segments for table uniqdata_stream").show
> It shows the following output:
> +-----------------+---------+--------------------+-------------+---------+-----------+
> |SegmentSequenceId|   Status|     Load Start Time|Load End Time|Merged 
> To|File Format|
> +-----------------+---------+--------------------+-------------+---------+-----------+
> |                0|Streaming|2018-01-05 18:23:...|         null|       NA|    
>  ROW_V1|
> +-----------------+---------+--------------------+-------------+---------+-----------+
> Status for the segment is not updated



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to