[jira] [Created] (SPARK-31737) SparkSQL can't recognize the modified length of Hive varchar
Kevin Zhang created SPARK-31737: --- Summary: SparkSQL can't recognize the modified length of Hive varchar Key: SPARK-31737 URL: https://issues.apache.org/jira/browse/SPARK-31737 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.0.0 Reporter: Kevin Zhang {code:java} //Create a hive table in sparksql spark-sql> create table test_table(id varchar(5)) stored as textfile; // Create a file under linux, the content is 1234567890 // Then load this file into this hive table spark-sql> load data local inpath '/home/test_table' overwrite into table test_table; // Query this table in sparksql spark-sql> select * from test_table; 12345 // Modify the length of this field in hive hive> alter table test_table change column id id varchar(10); // Query this table in hive hive> select * from test_table; OK 1234567890 // Query this table in sparksql again spark-sql> select * from test_table; // The obtained id is still 12345, so the length modification does not take effect in SparkSQL 12345{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing
[ https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894323#comment-16894323 ] Kevin Zhang commented on SPARK-24036: - Hi [~joseph.torres], is there any update on this work? Will the new feature be included in spark 3.0? > Stateful operators in continuous processing > --- > > Key: SPARK-24036 > URL: https://issues.apache.org/jira/browse/SPARK-24036 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 3.0.0 >Reporter: Jose Torres >Priority: Major > > The first iteration of continuous processing in Spark 2.3 does not work with > stateful operators. -- This message was sent by Atlassian JIRA (v7.6.14#76016) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-24630: Comment: was deleted (was: thanks [~Jackey Lee] So I'm wondering what's blocking the pr of this issue to be merged, is it related to DataSourceV2?) > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP V2.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818598#comment-16818598 ] Kevin Zhang commented on SPARK-24630: - thanks [~Jackey Lee] So I'm wondering what's blocking the pr of this issue to be merged, is it related to DataSourceV2? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP V2.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost
[ https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-27340: Description: When we use data api to write a structured streaming query job we usually specify a watermark on event time column. If we define a window on the event time column, the delayKey metadata of the event time column is supposed to be propagated to the new column generated by time window expression. But if we add additional alias on the time window column, the delayKey metadata is lost. Currently I only find the bug will affect stream-stream join with equal window join keys. In terms of aggregation, the gourping expression can be trimed(in CleanupAliases rule) so additional alias are removed and the metadata is kept. Here is an example: {code:scala} val sparkSession = SparkSession .builder() .master("local") .getOrCreate() val rateStream = sparkSession.readStream .format("rate") .option("rowsPerSecond", 10) .load() val fooStream = rateStream .select( col("value").as("fooId"), col("timestamp").as("fooTime") ) .withWatermark("fooTime", "2 seconds") .select($"fooId", $"fooTime", window($"fooTime", "2 seconds").alias("fooWindow")) val barStream = rateStream .where(col("value") % 2 === 0) .select( col("value").as("barId"), col("timestamp").as("barTime") ) .withWatermark("barTime", "2 seconds") .select($"barId", $"barTime", window($"barTime", "2 seconds").alias("barWindow")) val joinedDf = fooStream .join( barStream, $"fooId" === $"barId" && fooStream.col("fooWindow") === barStream.col("barWindow"), joinType = "LeftOuter" ) val query = joinedDf .writeStream .format("console") .option("truncate", 100) .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() {code} this program will end with an exception, and from the analyzed plan we can see there is no delayKey metadata on 'fooWindow' {code:java} org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;; Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19)) :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9] : +- Filter isnotnull(fooTime#5-T2000ms) : +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, precisetimestampconversion((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms] :+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds : +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5] : +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L] +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19] +- Filter isnotnull(barTime#15-T2000ms) +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType,
[jira] [Updated] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost
[ https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-27340: Description: When we use data api to write a structured streaming query job we usually specify a watermark on event time column. If we define a window on the event time column, the delayKey metadata of the event time column is supposed to be propagated to the new column generated by time window expression. But if we add additional alias on the time window column, the delayKey metadata is lost. Currently I only find the bug will affect stream-stream join. In terms of aggregation, the gourping expression can be trimed(in CleanupAliases rule) so additional alias are removed and the metadata is kept. Here is an example: {code:scala} val sparkSession = SparkSession .builder() .master("local") .getOrCreate() val rateStream = sparkSession.readStream .format("rate") .option("rowsPerSecond", 10) .load() val fooStream = rateStream .select( col("value").as("fooId"), col("timestamp").as("fooTime") ) .withWatermark("fooTime", "2 seconds") .select($"fooId", $"fooTime", window($"fooTime", "2 seconds").alias("fooWindow")) val barStream = rateStream .where(col("value") % 2 === 0) .select( col("value").as("barId"), col("timestamp").as("barTime") ) .withWatermark("barTime", "2 seconds") .select($"barId", $"barTime", window($"barTime", "2 seconds").alias("barWindow")) val joinedDf = fooStream .join( barStream, $"fooId" === $"barId" && fooStream.col("fooWindow") === barStream.col("barWindow"), joinType = "LeftOuter" ) val query = joinedDf .writeStream .format("console") .option("truncate", 100) .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() {code} this program will end with an exception, and from the analyzed plan we can see there is no delayKey metadata on 'fooWindow' {code:java} org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;; Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19)) :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9] : +- Filter isnotnull(fooTime#5-T2000ms) : +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, precisetimestampconversion((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms] :+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds : +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5] : +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L] +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19] +- Filter isnotnull(barTime#15-T2000ms) +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) /
[jira] [Created] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost
Kevin Zhang created SPARK-27340: --- Summary: Alias on TimeWIndow expression may cause watermark metadata lost Key: SPARK-27340 URL: https://issues.apache.org/jira/browse/SPARK-27340 Project: Spark Issue Type: Bug Components: SQL, Structured Streaming Affects Versions: 2.4.0 Reporter: Kevin Zhang When we use data api to write a structured streaming query job we usually specify a watermark on event time column. If we define a window on the event time column, the delayKey metadata of the event time column is supposed to be propagated to the new column generated by time window expression. But if we add additional alias on the time window column, the delayKey metadata is lost. Currently I only find the bug will affect stream-stream join. In terms of aggregation, the gourping expression can be trimed so additional alias are removed and the metadata is kept. Here is an example: {code:scala} val sparkSession = SparkSession .builder() .master("local") .getOrCreate() val rateStream = sparkSession.readStream .format("rate") .option("rowsPerSecond", 10) .load() val fooStream = rateStream .select( col("value").as("fooId"), col("timestamp").as("fooTime") ) .withWatermark("fooTime", "2 seconds") .select($"fooId", $"fooTime", window($"fooTime", "2 seconds").alias("fooWindow")) val barStream = rateStream // Introduce misses for ease of debugging .where(col("value") % 2 === 0) .select( col("value").as("barId"), col("timestamp").as("barTime") ) .withWatermark("barTime", "2 seconds") .select($"barId", $"barTime", window($"barTime", "2 seconds").alias("barWindow")) val joinedDf = fooStream .join( barStream, $"fooId" === $"barId" && fooStream.col("fooWindow") === barStream.col("barWindow"), joinType = "LeftOuter" ) val query = joinedDf .writeStream .format("console") .option("truncate", 100) .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() {code} this program will end with an exception, and from the analyzed plan we can see there is no delayKey metadata on 'fooWindow' {code:java} org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;; Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19)) :- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9] : +- Filter isnotnull(fooTime#5-T2000ms) : +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, precisetimestampconversion((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) = (cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) THEN (CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS window#10-T2000ms, fooId#4L, fooTime#5-T2000ms] :+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds : +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5] : +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L] +- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19] +- Filter isnotnull(barTime#15-T2000ms) +- Project [named_struct(start, precisetimestampconversion(CASE WHEN (cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) - 0) as double) / cast(200 as double))) as double) =
[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark
[ https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804535#comment-16804535 ] Kevin Zhang commented on SPARK-24630: - Hi, is there any update about the progress? > SPIP: Support SQLStreaming in Spark > --- > > Key: SPARK-24630 > URL: https://issues.apache.org/jira/browse/SPARK-24630 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.2.0, 2.2.1 >Reporter: Jackey Lee >Priority: Minor > Labels: SQLStreaming > Attachments: SQLStreaming SPIP V2.pdf > > > At present, KafkaSQL, Flink SQL(which is actually based on Calcite), > SQLStream, StormSQL all provide a stream type SQL interface, with which users > with little knowledge about streaming, can easily develop a flow system > processing model. In Spark, we can also support SQL API based on > StructStreamig. > To support for SQL Streaming, there are two key points: > 1, Analysis should be able to parse streaming type SQL. > 2, Analyzer should be able to map metadata information to the corresponding > Relation. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-26350) Allow the user to override the group id of the Kafka's consumer
[ https://issues.apache.org/jira/browse/SPARK-26350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-26350: Attachment: Permalink.url > Allow the user to override the group id of the Kafka's consumer > --- > > Key: SPARK-26350 > URL: https://issues.apache.org/jira/browse/SPARK-26350 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.0 >Reporter: Shixiong Zhu >Priority: Major > Attachments: Permalink.url > > > Sometimes the group id is used to identify the stream for "security". We > should give a flag that lets you override it. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20295) when spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue
[ https://issues.apache.org/jira/browse/SPARK-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519994#comment-16519994 ] Kevin Zhang commented on SPARK-20295: - Still hit this bug in spark 2.3.0 > when spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue > -- > > Key: SPARK-20295 > URL: https://issues.apache.org/jira/browse/SPARK-20295 > Project: Spark > Issue Type: Bug > Components: Shuffle, SQL >Affects Versions: 2.1.0 >Reporter: Ruhui Wang >Priority: Major > > when run tpcds-q95, and set spark.sql.adaptive.enabled = true the physical > plan firstly: > Sort > : +- Exchange(coordinator id: 1) > : +- Project*** > ::-Sort ** > :: +- Exchange(coordinator id: 2) > :: :- Project *** > :+- Sort > :: +- Exchange(coordinator id: 3) > spark.sql.exchange.reuse is opened, then physical plan will become below: > Sort > : +- Exchange(coordinator id: 1) > : +- Project*** > ::-Sort ** > :: +- Exchange(coordinator id: 2) > :: :- Project *** > :+- Sort > :: +- ReusedExchange Exchange(coordinator id: 2) > If spark.sql.adaptive.enabled = true, the code stack is : > ShuffleExchange#doExecute --> postShuffleRDD function --> > doEstimationIfNecessary . In this function, > assert(exchanges.length == numExchanges) will be error, as left side has only > one element, but right is equal to 2. > If this is a bug of spark.sql.adaptive.enabled and exchange resue? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14974) spark sql job create too many files in HDFS when doing insert overwrite hive table
[ https://issues.apache.org/jira/browse/SPARK-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380126#comment-16380126 ] Kevin Zhang commented on SPARK-14974: - I encountered the same problem with [~ussraf] in spark 2.2 and 2.3, and I'm not quite sure about how to fix it. Is there any plan to reopen the issue? > spark sql job create too many files in HDFS when doing insert overwrite hive > table > -- > > Key: SPARK-14974 > URL: https://issues.apache.org/jira/browse/SPARK-14974 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 1.5.2 >Reporter: zenglinxi >Priority: Minor > > Recently, we often encounter problems using spark sql for inserting data into > a partition table (ex.: insert overwrite table $output_table partition(dt) > select xxx from tmp_table). > After the spark job start running on yarn, the app will create too many files > (ex. 2,000,000, or even 10,000,000), which will make HDFS under enormous > pressure. > We found that the num of files created by spark job is depending on the > partition num of hive table that will be inserted and the num of spark sql > partitions. > files_num = hive_table_partions_num * spark_sql_partitions_num. > We often make the spark_sql_partitions_num(spark.sql.shuffle.partitions) >= > 1000, and the hive_table_partions_num is very small under normal > circumstances, but it will turn out to be more than 2000 when we input a > wrong field as the partion field unconsciously, which will make the files_num > >= 1000 * 2000 = 2,000,000. > There is a configuration parameter in hive that can limit the maximum number > of dynamic partitions allowed to be created in each mapper/reducer named > hive.exec.max.dynamic.partitions.pernode, but this conf parameter did't work > when we use hiveContext. > Reducing spark_sql_partitions_num(spark.sql.shuffle.partitions) can make the > files_num be smaller, but it will affect the concurrency. > Can we create configuration parameters to limit the maximum number of files > allowed to be create by each task or limit the spark_sql_partitions_num > without affect the concurrency? -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23498) Accuracy problem in comparison with string and integer
[ https://issues.apache.org/jira/browse/SPARK-23498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379753#comment-16379753 ] Kevin Zhang commented on SPARK-23498: - yes, thanks. But when we use spark sql to run existing hive scripts we expected spark sql could have the same results as hive, and that's why I open this jira. Now that [~q79969786] has marked this as duplicated with [SPARK-21646 |https://issues.apache.org/jira/browse/SPARK-21646], I will patch in my own branch first. > Accuracy problem in comparison with string and integer > -- > > Key: SPARK-23498 > URL: https://issues.apache.org/jira/browse/SPARK-23498 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.2.1, 2.3.0 >Reporter: Kevin Zhang >Priority: Major > > While comparing a string column with integer value, spark sql will > automatically cast the string operant to int, the following sql will return > true in hive but false in spark > > {code:java} > select '1000.1'>1000 > {code} > > from the physical plan we can see the string operant was cast to int which > caused the accuracy loss > {code:java} > *Project [false AS (CAST(1000.1 AS INT) > 1000)#4] > +- Scan OneRowRelation[] > {code} > To solve it, using a wider common type like double to cast both sides of > operant of a binary operator may be safe. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-23498) Accuracy problem in comparison with string and integer
[ https://issues.apache.org/jira/browse/SPARK-23498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-23498: Description: While comparing a string column with integer value, spark sql will automatically cast the string operant to int, the following sql will return true in hive but false in spark {code:java} select '1000.1'>1000 {code} from the physical plan we can see the string operant was cast to int which caused the accuracy loss {code:java} *Project [false AS (CAST(1000.1 AS INT) > 1000)#4] +- Scan OneRowRelation[] {code} To solve it, using a wider common type like double to cast both sides of operant of a binary operator may be safe. was: While comparing a string column with integer value, spark sql will automatically cast the string operant to int, the following sql will return true in hive but false in spark {code:java} select '1000.1'>1000 {code} from the physical plan we can see the string operant was cast to int which caused the accuracy loss {code:java} *Project [false AS (CAST(1000.1 AS INT) > 1000)#4] +- Scan OneRowRelation[] {code} Similar to SPARK-22469, I think it's safe to use double a common type to cast both side of operants to. > Accuracy problem in comparison with string and integer > -- > > Key: SPARK-23498 > URL: https://issues.apache.org/jira/browse/SPARK-23498 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0, 2.2.1, 2.3.0 >Reporter: Kevin Zhang >Priority: Major > > While comparing a string column with integer value, spark sql will > automatically cast the string operant to int, the following sql will return > true in hive but false in spark > > {code:java} > select '1000.1'>1000 > {code} > > from the physical plan we can see the string operant was cast to int which > caused the accuracy loss > {code:java} > *Project [false AS (CAST(1000.1 AS INT) > 1000)#4] > +- Scan OneRowRelation[] > {code} > To solve it, using a wider common type like double to cast both sides of > operant of a binary operator may be safe. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23498) Accuracy problem in comparison with string and integer
Kevin Zhang created SPARK-23498: --- Summary: Accuracy problem in comparison with string and integer Key: SPARK-23498 URL: https://issues.apache.org/jira/browse/SPARK-23498 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.1, 2.2.0, 2.3.0 Reporter: Kevin Zhang While comparing a string column with integer value, spark sql will automatically cast the string operant to int, the following sql will return true in hive but false in spark {code:java} select '1000.1'>1000 {code} from the physical plan we can see the string operant was cast to int which caused the accuracy loss {code:java} *Project [false AS (CAST(1000.1 AS INT) > 1000)#4] +- Scan OneRowRelation[] {code} Similar to SPARK-22469, I think it's safe to use double a common type to cast both side of operants to. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results
[ https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288608#comment-16288608 ] Kevin Zhang commented on SPARK-22755: - [~ksunitha] Thanks, it helps a lot > Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return > different results > - > > Key: SPARK-22755 > URL: https://issues.apache.org/jira/browse/SPARK-22755 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > Fix For: 2.2.1 > > > both of the following sql statements > {code:sql} > select ((946-885)*1.000/946 < 0.1) > {code} > and > {code:sql} > select ((946-885)*1.0/946 < 0.100) > {code} > return true, while the following statement > {code:sql} > select ((946-885)*1.0/946 < 0.1) > {code} > returns false -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results
[ https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961 ] Kevin Zhang edited comment on SPARK-22755 at 12/12/17 2:03 AM: --- Thanks for reply. In hive and presto the result is supposed to be true. I tried another time in both spark sql repl and thrift-server, the statement `select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2 instead of the trunk codeline {code:sql} +---+ |(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))| +---+ | false| +---+ {code} and I found out my physical plan is different from yours in the decimal precision and scale, is there any configuration influences? or the bug has been fixed by some issue? was (Author: kevinzwx): Thanks for reply. In hive and presto the result is supposed to be true. I tried another time in both spark sql repl and thrift-server, the statement `select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, which version are you using? {code:sql} +---+ |(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))| +---+ | false| +---+ {code} and I found out my physical plan is different from yours in the decimal precision and scale, is there any configuration influences? > Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return > different results > - > > Key: SPARK-22755 > URL: https://issues.apache.org/jira/browse/SPARK-22755 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > both of the following sql statements > {code:sql} > select ((946-885)*1.000/946 < 0.1) > {code} > and > {code:sql} > select ((946-885)*1.0/946 < 0.100) > {code} > return true, while the following statement > {code:sql} > select ((946-885)*1.0/946 < 0.1) > {code} > returns false -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results
[ https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961 ] Kevin Zhang edited comment on SPARK-22755 at 12/12/17 1:58 AM: --- Thanks for reply. In hive and presto the result is supposed to be true. I tried another time in both spark sql repl and thrift-server, the statement `select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, which version are you using? {code:sql} +---+ |(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))| +---+ | false| +---+ {code} and I found out my physical plan is different from yours in the decimal precision and scale, is there any configuration influences? was (Author: kevinzwx): Thanks for reply. In hive and presto the result is supposed to be true. I tried another time in both spark sql repl and thrift-server, the statement `select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, which version are you using? {code:sql} +---+ |(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))| +---+ | false| +---+ {code} > Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return > different results > - > > Key: SPARK-22755 > URL: https://issues.apache.org/jira/browse/SPARK-22755 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > both of the following sql statements > {code:sql} > select ((946-885)*1.000/946 < 0.1) > {code} > and > {code:sql} > select ((946-885)*1.0/946 < 0.100) > {code} > return true, while the following statement > {code:sql} > select ((946-885)*1.0/946 < 0.1) > {code} > returns false -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results
[ https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961 ] Kevin Zhang commented on SPARK-22755: - Thanks for reply. In hive and presto the result is supposed to be true. I tried another time in both spark sql repl and thrift-server, the statement `select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, which version are you using? {code:sql} +---+ |(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))| +---+ | false| +---+ {code} > Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return > different results > - > > Key: SPARK-22755 > URL: https://issues.apache.org/jira/browse/SPARK-22755 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > both of the following sql statements > {code:sql} > select ((946-885)*1.000/946 < 0.1) > {code} > and > {code:sql} > select ((946-885)*1.0/946 < 0.100) > {code} > return true, while the following statement > {code:sql} > select ((946-885)*1.0/946 < 0.1) > {code} > returns false -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results
Kevin Zhang created SPARK-22755: --- Summary: Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results Key: SPARK-22755 URL: https://issues.apache.org/jira/browse/SPARK-22755 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.2.0 Reporter: Kevin Zhang both of the following sql statements {code:sql} select ((946-885)*1.000/946 < 0.1) {code} and {code:sql} select ((946-885)*1.0/946 < 0.100) {code} return true, while the following statement {code:sql} select ((946-885)*1.0/946 < 0.1) {code} returns false -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21944) Watermark on window column is wrong
[ https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158948#comment-16158948 ] Kevin Zhang commented on SPARK-21944: - [~mgaido] Do you mean the following way by saying "define the watermark on the column 'time' "? {code:java} val counts = events.select(window($"time", "5 seconds"), $"time", $"id") .withWatermark("time", "10 seconds") .dropDuplicates("id", "window") .groupBy("window") .count {code} I don't know whether this is right, because the documentation indicates we should use the same column as is used in watermark, that is "time" column(which is not what I want). I tried this way and the application dosen't throw any exception, but it didn't drop events older than the watermark as expected. In the following example, after the batch containing an event with time=1504774540(2017/9/7 16:55:40 CST) is processed(the watermark should be adjust to 2017/9/7 16:55:30 CST), then I send an event with time=1504745724(2017/9/7 8:55:24 CST), this event is processed instead of being dropped as expected. {code:java} +-+-+ |window |count| +-+-+ |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| |[2017-09-07 08:55:20.0,2017-09-07 08:55:25.0]|1| |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| +-+-+ {min=2017-09-07T00:55:24.000Z, avg=2017-09-07T00:55:24.000Z, watermark=2017-09-07T08:55:30.000Z, max=2017-09-07T00:55:24.000Z} {code} Here is one thing important I have to say, that is my time zone is CST, instead of UTC. The start and end time in window is right, but the watermark is reported in UTC. I don't know whether this influences. If I didn't make everything clear, please point it and I will explain. Thanks > Watermark on window column is wrong > --- > > Key: SPARK-21944 > URL: https://issues.apache.org/jira/browse/SPARK-21944 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > When I use a watermark with dropDuplicates in the following way, the > watermark is calculated wrong > {code:java} > val counts = events.select(window($"time", "5 seconds"), $"time", $"id") > .withWatermark("window", "10 seconds") > .dropDuplicates("id", "window") > .groupBy("window") > .count > {code} > where events is a dataframe with a timestamp column "time" and long column > "id". > I registered a listener to print the event time stats in each batch, and the > results is like the following > {code:shell} > --- > Batch: 0 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > --- > Batch: 1 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > --- > Batch: 2 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > {code} > As can be seen, the event time stats are wrong which are always in > 1970-01-01, so the watermark
[jira] [Comment Edited] (SPARK-21944) Watermark on window column is wrong
[ https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158055#comment-16158055 ] Kevin Zhang edited comment on SPARK-21944 at 9/8/17 2:51 AM: - I found the problem when I use kafka source consuming our internal topics, but it is hard to verify what the problem is. So then I use socket source and produce some data myself, like (1504774520 1), (1504774521 2), (1504774540 1), (1504774520 4)... where the first element is timestamp, and the second element is id. so it's easy for you to produce sample data and reproduce the problem. Also the following is one group of data I used: {code:xml} 1504774520 1 1504774521 2 1504774520 3 1504774540 1 1504774520 4 1504774531 1 1504774532 1 1504774533 1 1504774520 1 1504774520 10 1504774526 11 {code} was (Author: kevinzwx): I found the problem when I use kafka source consuming our internal topics, but it is hard to verify what the problem is. So then I use socket source and produce some data myself, like (1504774520 1), (1504774521 2), (1504774540 1), (1504774520 4)... where the first element is timestamp, and the second element is id. so it's easy for you to produce sample data and reproduce the problem. Also the following is one group of data I used: {code:shell} 1504774520 1 1504774521 2 1504774520 3 1504774540 1 1504774520 4 1504774531 1 1504774532 1 1504774533 1 1504774520 1 1504774520 10 1504774526 11 {code} > Watermark on window column is wrong > --- > > Key: SPARK-21944 > URL: https://issues.apache.org/jira/browse/SPARK-21944 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > When I use a watermark with dropDuplicates in the following way, the > watermark is calculated wrong > {code:java} > val counts = events.select(window($"time", "5 seconds"), $"time", $"id") > .withWatermark("window", "10 seconds") > .dropDuplicates("id", "window") > .groupBy("window") > .count > {code} > where events is a dataframe with a timestamp column "time" and long column > "id". > I registered a listener to print the event time stats in each batch, and the > results is like the following > {code:shell} > --- > Batch: 0 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > --- > Batch: 1 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > --- > Batch: 2 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > {code} > As can be seen, the event time stats are wrong which are always in > 1970-01-01, so the watermark is calculated wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21944) Watermark on window column is wrong
[ https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158055#comment-16158055 ] Kevin Zhang commented on SPARK-21944: - I found the problem when I use kafka source consuming our internal topics, but it is hard to verify what the problem is. So then I use socket source and produce some data myself, like (1504774520 1), (1504774521 2), (1504774540 1), (1504774520 4)... where the first element is timestamp, and the second element is id. so it's easy for you to produce sample data and reproduce the problem. Also the following is one group of data I used: {code:shell} 1504774520 1 1504774521 2 1504774520 3 1504774540 1 1504774520 4 1504774531 1 1504774532 1 1504774533 1 1504774520 1 1504774520 10 1504774526 11 {code} > Watermark on window column is wrong > --- > > Key: SPARK-21944 > URL: https://issues.apache.org/jira/browse/SPARK-21944 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Kevin Zhang > > When I use a watermark with dropDuplicates in the following way, the > watermark is calculated wrong > {code:java} > val counts = events.select(window($"time", "5 seconds"), $"time", $"id") > .withWatermark("window", "10 seconds") > .dropDuplicates("id", "window") > .groupBy("window") > .count > {code} > where events is a dataframe with a timestamp column "time" and long column > "id". > I registered a listener to print the event time stats in each batch, and the > results is like the following > {code:shell} > --- > Batch: 0 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > {watermark=1970-01-01T00:00:00.000Z} > --- > Batch: 1 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > --- > Batch: 2 > --- > +-+-+ > > |window |count| > +-+-+ > |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| > |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4| > +-+-+ > {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, > watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} > {watermark=1970-01-01T19:05:09.476Z} > {code} > As can be seen, the event time stats are wrong which are always in > 1970-01-01, so the watermark is calculated wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21944) Watermark on window column is wrong
Kevin Zhang created SPARK-21944: --- Summary: Watermark on window column is wrong Key: SPARK-21944 URL: https://issues.apache.org/jira/browse/SPARK-21944 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Kevin Zhang When I use a watermark with dropDuplicates in the following way, the watermark is calculated wrong {code:java} val counts = events.select(window($"time", "5 seconds"), $"time", $"id") .withWatermark("window", "10 seconds") .dropDuplicates("id", "window") .groupBy("window") .count {code} where events is a dataframe with a timestamp column "time" and long column "id". I registered a listener to print the event time stats in each batch, and the results is like the following {code:shell} --- Batch: 0 --- +-+-+ |window |count| +-+-+ |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| +-+-+ {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z} {watermark=1970-01-01T00:00:00.000Z} {watermark=1970-01-01T00:00:00.000Z} {watermark=1970-01-01T00:00:00.000Z} {watermark=1970-01-01T00:00:00.000Z} --- Batch: 1 --- +-+-+ |window |count| +-+-+ |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3| +-+-+ {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} {watermark=1970-01-01T19:05:09.476Z} --- Batch: 2 --- +-+-+ |window |count| +-+-+ |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1| |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4| +-+-+ {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z} {watermark=1970-01-01T19:05:09.476Z} {code} As can be seen, the event time stats are wrong which are always in 1970-01-01, so the watermark is calculated wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120924#comment-16120924 ] Kevin Zhang commented on SPARK-21590: - Thanks, I'd like to work on this. I agree the requirement that the absolute value of the start offset is less than the slide interval, but I don't know why we should add the slide interval to start offset when it's negative? I've tried in my local environment that currently the calculation supports negative value of start offset, it's the non-negative check for parameters of the window that limits. So I suggest only to make some changes of the check function. How do you think about it? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900 ] Kevin Zhang edited comment on SPARK-21590 at 8/8/17 6:15 AM: - [~brkyvz] Thanks for your advice, but I believe there are still something you are not completely clear about what I described. Let me talk about what I found unreasonable and that' s why I'm here opening the issue. Before digging into adjusting start offset of the window, I've tried using from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, but all of these cannot get the right result, because all the timestamps of my events are generated in CST, if I use from_utc_timestamp to convert the timestamp of all events to utc time , take today (2017-08-08) as example, I finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 00:00. Other methods are also incorrect and won't be covered again here. So the problem is I have the correct timestamp and can get the right date of window, but the only thing weird is the 8 hours start offset. That's why I want to set the start offset to -8 to adjust the window, and I think it's very reasonable because flink [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] runs exactly in this way. As for "+16" offset, I've told that it works well, like what you described in your example for something happened in 2017-03-14 17:30:00 CST I can get a window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of window falls back to yesterday? Don't you think it's kind of confusing for users? was (Author: kevinzwx): [~brkyvz] Thanks for your advice, but I believe you don't completely understand the problem. Let me talk about what I found unreasonable and that' s why I'm here opening the issue. Before digging into adjusting start offset of the window, I've tried using from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, but all of these cannot get the right result, because all the timestamps of my events are generated in CST, if I use from_utc_timestamp to convert the timestamp of all events to utc time , take today (2017-08-08) as example, I finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 00:00. Other methods are also incorrect and won't be covered again here. So the problem is I have the correct timestamp and can get the right date of window, but the only thing weird is the 8 hours start offset. That's why I want to set the start offset to -8 to adjust the window, and I think it's very reasonable because flink [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] runs exactly in this way. As for "+16" offset, I've told that it works well, like what you described in your example for something happened in 2017-03-14 17:30:00 CST I can get a window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of window falls back to yesterday? Don't you think it's kind of confusing for users? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve
[jira] [Comment Edited] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900 ] Kevin Zhang edited comment on SPARK-21590 at 8/8/17 6:05 AM: - [~brkyvz] Thanks for your advice, but I believe you don't completely understand the problem. Let me talk about what I found unreasonable and that' s why I'm here opening the issue. Before digging into adjusting start offset of the window, I've tried using from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, but all of these cannot get the right result, because all the timestamps of my events are generated in CST, if I use from_utc_timestamp to convert the timestamp of all events to utc time , take today (2017-08-08) as example, I finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 00:00. Other methods are also incorrect and won't be covered again here. So the problem is I have the correct timestamp and can get the right date of window, but the only thing weird is the 8 hours start offset. That's why I want to set the start offset to -8 to adjust the window, and I think it's very reasonable because flink [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] runs exactly in this way. As for "+16" offset, I've told that it works well, like what you described in your example for something happened in 2017-03-14 17:30:00 CST I can get a window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of window falls back to yesterday? Don't you think it's kind of confusing for users? was (Author: kevinzwx): [~brkyvz] Thanks for your advice, but I believe you don't completely understand the problem. Let me talk about what I found unreasonable and that' s why I'm here opening the issue. Before digging into adjusting start offset of the window, I've tried using from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, but all of these cannot get the right result, because all the timestamps of my events are generated in CST, if I use from_utc_timestamp to convert the timestamp of all events to utc time , take today (2017-08-08) as example, I finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 00:00. Other methods are also incorrect and won't be covered again here. So the problem is I have the correct timestamp and can get the right date of window, but the only thing weird is the 8 hours start offset. That's why I want to set the start offset to -8 to adjust the window, and I think it's very reasonable because flink [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] runs exactly in this way. As for "+16" offset, I've told that it works well, like what you described in your example for something happened in 2017-03-14 17:30:00 CST I can get a window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of window falls back to yesterday? Don't you think it's kind of confusing for users? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864,
[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900 ] Kevin Zhang commented on SPARK-21590: - [~brkyvz] Thanks for your advice, but I believe you don't completely understand the problem. Let me talk about what I found unreasonable and that' s why I'm here opening the issue. Before digging into adjusting start offset of the window, I've tried using from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, but all of these cannot get the right result, because all the timestamps of my events are generated in CST, if I use from_utc_timestamp to convert the timestamp of all events to utc time , take today (2017-08-08) as example, I finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 00:00. Other methods are also incorrect and won't be covered again here. So the problem is I have the correct timestamp and can get the right date of window, but the only thing weird is the 8 hours start offset. That's why I want to set the start offset to -8 to adjust the window, and I think it's very reasonable because flink [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] runs exactly in this way. As for "+16" offset, I've told that it works well, like what you described in your example for something happened in 2017-03-14 17:30:00 CST I can get a window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of window falls back to yesterday? Don't you think it's kind of confusing for users? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112274#comment-16112274 ] Kevin Zhang commented on SPARK-21590: - [~brkyvz] I don' t know the test you illustrated, but according to the source code (in org.apache.spark.sql.catalyst.expressions.TimeWindow) below and the logs in the description produced when I use a window like _window($"timestamp", "1 day", "1 day", "-8 hours")_ {code:java} if (startTime < 0) { return TypeCheckFailure(s"The start time ($startTime) must be greater than or equal to 0.") } {code} As for using `+16 hours` instead of `-8`, I tried it and it seems everything is ok. But I'm a little confused about the range of the window. i.e. when I use window("1 day") I' ll get a window like 2017-08-03 08:00 - 2017-08-04 08:00, according to my understanding, providing a '+16' start time I will have a window like 2017-08-04 00:00 - 2017-08-05 00:00 (that is the window of the next day), but I get the window like 2017-08-03 00:00 - 2017-08-04 00:00, which is exactly the same as providing '-8', is there something wrong? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-21546) dropDuplicates with watermark yields RuntimeException due to binding failure
[ https://issues.apache.org/jira/browse/SPARK-21546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112146#comment-16112146 ] Kevin Zhang commented on SPARK-21546: - [~zsxwing] in my case I hope to use a watermark to expire the state but not use the watermark column to filter duplicate elements. i.e. I want to count the unique access of my website for one day, so I should just store the state of dropDuplicates for one day and drop the state the next day, meanwhile I want to use uuid as the key to drop duplicate elements rather than using (uuid, eventTime) together, but dropDuplicates behaves like the latter right? If so how can I get the right results as I expected? > dropDuplicates with watermark yields RuntimeException due to binding failure > > > Key: SPARK-21546 > URL: https://issues.apache.org/jira/browse/SPARK-21546 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.3.0 >Reporter: Jacek Laskowski >Assignee: Shixiong Zhu > Fix For: 2.2.1, 2.3.0 > > > With today's master... > The following streaming query with watermark and {{dropDuplicates}} yields > {{RuntimeException}} due to failure in binding. > {code} > val topic1 = spark. > readStream. > format("kafka"). > option("subscribe", "topic1"). > option("kafka.bootstrap.servers", "localhost:9092"). > option("startingoffsets", "earliest"). > load > val records = topic1. > withColumn("eventtime", 'timestamp). // <-- just to put the right name > given the purpose > withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // > <-- use the renamed eventtime column > dropDuplicates("value"). // dropDuplicates will use watermark > // only when eventTime column exists > // include the watermark column => internal design leak? > select('key cast "string", 'value cast "string", 'eventtime). > as[(String, String, java.sql.Timestamp)] > scala> records.explain > == Physical Plan == > *Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS > value#170, eventtime#157-T3ms] > +- StreamingDeduplicate [value#1], > StatefulOperatorStateInfo(,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0), > 0 >+- Exchange hashpartitioning(value#1, 200) > +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds > +- *Project [key#0, value#1, timestamp#5 AS eventtime#157] > +- StreamingRelation kafka, [key#0, value#1, topic#2, > partition#3, offset#4L, timestamp#5, timestampType#6] > import org.apache.spark.sql.streaming.{OutputMode, Trigger} > val sq = records. > writeStream. > format("console"). > option("truncate", false). > trigger(Trigger.ProcessingTime("10 seconds")). > queryName("from-kafka-topic1-to-console"). > outputMode(OutputMode.Update). > start > {code} > {code} > --- > Batch: 0 > --- > 17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID > 438) > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding > attribute, tree: eventtime#157-T3ms > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87) > at >
[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110209#comment-16110209 ] Kevin Zhang commented on SPARK-21590: - I think the only problem is the non-negative check of window start time, and other checks are correct. So I suggest to simply remove the non-negative judgement of start time to solve the problem. If you think it's ok then I will work on this. > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
[ https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kevin Zhang updated SPARK-21590: Description: I want to calculate (unique) daily access count using structured streaming (2.2.0). Now strut streaming' s window with 1 day duration starts at 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST (UTC + 8 hours) and I want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). In Flink I can set the window offset to -8 hours to make it, but here in struct streaming if I set the start time (same as the offset in Flink) to -8 or any other negative values, I will get the following error: {code:java} Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 864, 864, -288)' due to data type mismatch: The start time (-288) must be greater than or equal to 0.;; {code} because the time window checks the input parameters to guarantee each value is greater than or equal to 0. So I'm thinking about whether we can remove the limit that the start time cannot be negative? was: I want to calculate (unique) daily access count using structured streaming (2.2.0). Now strut streaming' s window with 1 day duration starts at 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST (UTC + 8 hours) and I want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). In Flink I can set the window offset to -8 hours to make it, but here in struct streaming if I set the start time (same as the offset in Flink) to -8 or any other negative values, I will get the following error: {code:shell} Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 864, 864, -288)' due to data type mismatch: The start time (-288) must be greater than or equal to 0.;; {code} because the time window checks the input parameters to guarantee each value is greater than or equal to 0. So I'm thinking about whether we can remove the limit that the start time cannot be negative? > Structured Streaming window start time should support negative values to > adjust time zone > - > > Key: SPARK-21590 > URL: https://issues.apache.org/jira/browse/SPARK-21590 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0 > Environment: spark 2.2.0 >Reporter: Kevin Zhang > Labels: spark-sql, spark2.2, streaming, structured, timezone, > window > > I want to calculate (unique) daily access count using structured streaming > (2.2.0). > Now strut streaming' s window with 1 day duration starts at > 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST > (UTC + 8 hours) and I > want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). > In Flink I can set the window offset to -8 hours to make it, but here in > struct streaming if I set the start time (same as the offset in Flink) to -8 > or any other negative values, I will get the following error: > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot > resolve 'timewindow(timestamp, 864, 864, -288)' due > to data type mismatch: The start time (-288) must be greater than or > equal to 0.;; > {code} > because the time window checks the input parameters to guarantee each value > is greater than or equal to 0. > So I'm thinking about whether we can remove the limit that the start time > cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone
Kevin Zhang created SPARK-21590: --- Summary: Structured Streaming window start time should support negative values to adjust time zone Key: SPARK-21590 URL: https://issues.apache.org/jira/browse/SPARK-21590 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0, 2.1.0, 2.0.1, 2.0.0 Environment: spark 2.2.0 Reporter: Kevin Zhang I want to calculate (unique) daily access count using structured streaming (2.2.0). Now strut streaming' s window with 1 day duration starts at 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST (UTC + 8 hours) and I want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). In Flink I can set the window offset to -8 hours to make it, but here in struct streaming if I set the start time (same as the offset in Flink) to -8 or any other negative values, I will get the following error: {code:shell} Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'timewindow(timestamp, 864, 864, -288)' due to data type mismatch: The start time (-288) must be greater than or equal to 0.;; {code} because the time window checks the input parameters to guarantee each value is greater than or equal to 0. So I'm thinking about whether we can remove the limit that the start time cannot be negative? -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12221) Add CPU time metric to TaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-12221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15500750#comment-15500750 ] Kevin Zhang edited comment on SPARK-12221 at 9/18/16 10:35 AM: --- hi, is there any plan to put this to trunk? was (Author: kevinzwx): hi, is there any plan to put this to trunk?? > Add CPU time metric to TaskMetrics > -- > > Key: SPARK-12221 > URL: https://issues.apache.org/jira/browse/SPARK-12221 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Web UI >Affects Versions: 1.5.2 >Reporter: Jisoo Kim > > Currently TaskMetrics doesn't support executor CPU time. I'd like to have one > so I can retrieve the metric from History Server. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12221) Add CPU time metric to TaskMetrics
[ https://issues.apache.org/jira/browse/SPARK-12221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15500750#comment-15500750 ] Kevin Zhang commented on SPARK-12221: - hi, is there any plan to put this to trunk?? > Add CPU time metric to TaskMetrics > -- > > Key: SPARK-12221 > URL: https://issues.apache.org/jira/browse/SPARK-12221 > Project: Spark > Issue Type: Improvement > Components: Spark Core, Web UI >Affects Versions: 1.5.2 >Reporter: Jisoo Kim > > Currently TaskMetrics doesn't support executor CPU time. I'd like to have one > so I can retrieve the metric from History Server. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-13514) Spark Shuffle Service 1.6.0 issue in Yarn
[ https://issues.apache.org/jira/browse/SPARK-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15383437#comment-15383437 ] Kevin Zhang commented on SPARK-13514: - yes file:// does exsit and this is definitely a work around. But I what I mean is that is there other ways to solve the problem except for removing file:// prefix, because in our environment the prefix is needed for some reason. > Spark Shuffle Service 1.6.0 issue in Yarn > -- > > Key: SPARK-13514 > URL: https://issues.apache.org/jira/browse/SPARK-13514 > Project: Spark > Issue Type: Bug >Reporter: Satish Kolli > > Spark shuffle service 1.6.0 in Yarn fails with an unknown exception. When I > replace the spark shuffle jar with version 1.5.2 jar file, the following > succeeds with out any issues. > Hadoop Version: 2.5.1 (Kerberos Enabled) > Spark Version: 1.6.0 > Java Version: 1.7.0_79 > {code} > $SPARK_HOME/bin/spark-shell \ > --master yarn \ > --deploy-mode client \ > --conf spark.dynamicAllocation.enabled=true \ > --conf spark.dynamicAllocation.minExecutors=5 \ > --conf spark.yarn.executor.memoryOverhead=2048 \ > --conf spark.shuffle.service.enabled=true \ > --conf spark.scheduler.mode=FAIR \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > --executor-memory 6G \ > --driver-memory 8G > {code} > {code} > scala> val df = sc.parallelize(1 to 50).toDF > df: org.apache.spark.sql.DataFrame = [_1: int] > scala> df.show(50) > {code} > {code} > 16/02/26 08:20:53 INFO spark.SparkContext: Starting job: show at :30 > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Got job 0 (show at > :30) with 1 output partitions > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 > (show at :30) > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Parents of final stage: List() > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Missing parents: List() > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Submitting ResultStage 0 > (MapPartitionsRDD[2] at show at :30), which has no missing parents > 16/02/26 08:20:53 INFO storage.MemoryStore: Block broadcast_0 stored as > values in memory (estimated size 2.2 KB, free 2.2 KB) > 16/02/26 08:20:53 INFO storage.MemoryStore: Block broadcast_0_piece0 stored > as bytes in memory (estimated size 1411.0 B, free 3.6 KB) > 16/02/26 08:20:53 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in > memory on 10.5.76.106:46683 (size: 1411.0 B, free: 5.5 GB) > 16/02/26 08:20:53 INFO spark.SparkContext: Created broadcast 0 from broadcast > at DAGScheduler.scala:1006 > 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks > from ResultStage 0 (MapPartitionsRDD[2] at show at :30) > 16/02/26 08:20:53 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks > 16/02/26 08:20:53 INFO scheduler.FairSchedulableBuilder: Added task set > TaskSet_0 tasks to pool default > 16/02/26 08:20:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage > 0.0 (TID 0, , partition 0,PROCESS_LOCAL, 2031 bytes) > 16/02/26 08:20:53 INFO cluster.YarnClientSchedulerBackend: Disabling executor > 2. > 16/02/26 08:20:54 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 0) > 16/02/26 08:20:54 INFO storage.BlockManagerMasterEndpoint: Trying to remove > executor 2 from BlockManagerMaster. > 16/02/26 08:20:54 INFO storage.BlockManagerMasterEndpoint: Removing block > manager BlockManagerId(2, , 48113) > 16/02/26 08:20:54 INFO storage.BlockManagerMaster: Removed 2 successfully in > removeExecutor > 16/02/26 08:20:54 ERROR cluster.YarnScheduler: Lost executor 2 on > : Container marked as failed: > container_1456492687549_0001_01_03 on host: . > Exit status: 1. Diagnostics: Exception from container-launch: > ExitCodeException exitCode=1: > ExitCodeException exitCode=1: > at org.apache.hadoop.util.Shell.runCommand(Shell.java:538) > at org.apache.hadoop.util.Shell.run(Shell.java:455) > at > org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702) > at > org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300) > at > org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Container exited with a