[jira] [Created] (SPARK-31737) SparkSQL can't recognize the modified length of Hive varchar

2020-05-17 Thread Kevin Zhang (Jira)
Kevin Zhang created SPARK-31737:
---

 Summary: SparkSQL can't recognize the modified length of Hive 
varchar
 Key: SPARK-31737
 URL: https://issues.apache.org/jira/browse/SPARK-31737
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.0.0
Reporter: Kevin Zhang


{code:java}
//Create a hive table in sparksql
spark-sql> create table test_table(id varchar(5)) stored as textfile;

// Create a file under linux, the content is 1234567890

// Then load this file into this hive table
spark-sql> load data local inpath '/home/test_table' overwrite into table 
test_table;

// Query this table in sparksql
spark-sql> select * from test_table;
12345

// Modify the length of this field in hive
hive> alter table test_table change column id id varchar(10);

// Query this table in hive
hive> select * from test_table;
OK
1234567890

// Query this table in sparksql again
spark-sql> select * from test_table;
// The obtained id is still 12345, so the length modification does not take 
effect in SparkSQL
12345{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24036) Stateful operators in continuous processing

2019-07-27 Thread Kevin Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16894323#comment-16894323
 ] 

Kevin Zhang commented on SPARK-24036:
-

Hi [~joseph.torres], is there any update on this work? Will the new feature be 
included in spark 3.0?

> Stateful operators in continuous processing
> ---
>
> Key: SPARK-24036
> URL: https://issues.apache.org/jira/browse/SPARK-24036
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 3.0.0
>Reporter: Jose Torres
>Priority: Major
>
> The first iteration of continuous processing in Spark 2.3 does not work with 
> stateful operators.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2019-04-16 Thread Kevin Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-24630:

Comment: was deleted

(was: thanks [~Jackey Lee]
So I'm wondering what's blocking the pr of this issue to be merged, is it 
related to DataSourceV2?)

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP V2.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2019-04-15 Thread Kevin Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16818598#comment-16818598
 ] 

Kevin Zhang commented on SPARK-24630:
-

thanks [~Jackey Lee]
So I'm wondering what's blocking the pr of this issue to be merged, is it 
related to DataSourceV2?

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP V2.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost

2019-04-01 Thread Kevin Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-27340:

Description: 
When we use data api to write a structured streaming query job we usually 
specify a watermark on event time column. If we define a window on the event 
time column, the delayKey metadata of the event time column is supposed to be 
propagated to the new column generated by time window expression. But if we add 
additional alias on the time window column, the delayKey metadata is lost.

Currently I only find the bug will affect stream-stream join with equal window 
join keys. In terms of aggregation, the gourping expression can be trimed(in 
CleanupAliases rule) so additional alias are removed and the metadata is kept.

Here is an example:
{code:scala}
  val sparkSession = SparkSession
.builder()
.master("local")
.getOrCreate()
  val rateStream = sparkSession.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
val fooStream = rateStream
  .select(
col("value").as("fooId"),
col("timestamp").as("fooTime")
  )
  .withWatermark("fooTime", "2 seconds")
  .select($"fooId", $"fooTime", window($"fooTime", "2 
seconds").alias("fooWindow"))

val barStream = rateStream
  .where(col("value") % 2 === 0)
  .select(
col("value").as("barId"),
col("timestamp").as("barTime")
  )
  .withWatermark("barTime", "2 seconds")
  .select($"barId", $"barTime", window($"barTime", "2 
seconds").alias("barWindow"))

val joinedDf = fooStream
  .join(
barStream,
$"fooId" === $"barId" &&
  fooStream.col("fooWindow") === barStream.col("barWindow"),
joinType = "LeftOuter"
  )

  val query = joinedDf
  .writeStream
  .format("console")
  .option("truncate", 100)
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

query.awaitTermination()
{code}

this program will end with an exception, and from the analyzed plan we can see 
there is no delayKey metadata on 'fooWindow'

{code:java}
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
streaming DataFrame/Datasets is not supported without a watermark in the join 
keys, or a watermark on the nullable side and an appropriate range condition;;
Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
:- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
:  +- Filter isnotnull(fooTime#5-T2000ms)
: +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, 
precisetimestampconversion((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS 
window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
:+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
:   +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
:  +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
+- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
   +- Filter isnotnull(barTime#15-T2000ms)
  +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 

[jira] [Updated] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost

2019-04-01 Thread Kevin Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-27340?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-27340:

Description: 
When we use data api to write a structured streaming query job we usually 
specify a watermark on event time column. If we define a window on the event 
time column, the delayKey metadata of the event time column is supposed to be 
propagated to the new column generated by time window expression. But if we add 
additional alias on the time window column, the delayKey metadata is lost.

Currently I only find the bug will affect stream-stream join. In terms of 
aggregation, the gourping expression can be trimed(in CleanupAliases rule) so 
additional alias are removed and the metadata is kept.

Here is an example:
{code:scala}
  val sparkSession = SparkSession
.builder()
.master("local")
.getOrCreate()
  val rateStream = sparkSession.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
val fooStream = rateStream
  .select(
col("value").as("fooId"),
col("timestamp").as("fooTime")
  )
  .withWatermark("fooTime", "2 seconds")
  .select($"fooId", $"fooTime", window($"fooTime", "2 
seconds").alias("fooWindow"))

val barStream = rateStream
  .where(col("value") % 2 === 0)
  .select(
col("value").as("barId"),
col("timestamp").as("barTime")
  )
  .withWatermark("barTime", "2 seconds")
  .select($"barId", $"barTime", window($"barTime", "2 
seconds").alias("barWindow"))

val joinedDf = fooStream
  .join(
barStream,
$"fooId" === $"barId" &&
  fooStream.col("fooWindow") === barStream.col("barWindow"),
joinType = "LeftOuter"
  )

  val query = joinedDf
  .writeStream
  .format("console")
  .option("truncate", 100)
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

query.awaitTermination()
{code}

this program will end with an exception, and from the analyzed plan we can see 
there is no delayKey metadata on 'fooWindow'

{code:java}
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
streaming DataFrame/Datasets is not supported without a watermark in the join 
keys, or a watermark on the nullable side and an appropriate range condition;;
Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
:- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
:  +- Filter isnotnull(fooTime#5-T2000ms)
: +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, 
precisetimestampconversion((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS 
window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
:+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
:   +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
:  +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
+- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
   +- Filter isnotnull(barTime#15-T2000ms)
  +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, LongType) 
- 0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / 

[jira] [Created] (SPARK-27340) Alias on TimeWIndow expression may cause watermark metadata lost

2019-04-01 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-27340:
---

 Summary: Alias on TimeWIndow expression may cause watermark 
metadata lost 
 Key: SPARK-27340
 URL: https://issues.apache.org/jira/browse/SPARK-27340
 Project: Spark
  Issue Type: Bug
  Components: SQL, Structured Streaming
Affects Versions: 2.4.0
Reporter: Kevin Zhang


When we use data api to write a structured streaming query job we usually 
specify a watermark on event time column. If we define a window on the event 
time column, the delayKey metadata of the event time column is supposed to be 
propagated to the new column generated by time window expression. But if we add 
additional alias on the time window column, the delayKey metadata is lost.

Currently I only find the bug will affect stream-stream join. In terms of 
aggregation, the gourping expression can be trimed so additional alias are 
removed and the metadata is kept.

Here is an example:
{code:scala}
  val sparkSession = SparkSession
.builder()
.master("local")
.getOrCreate()
  val rateStream = sparkSession.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
val fooStream = rateStream
  .select(
col("value").as("fooId"),
col("timestamp").as("fooTime")
  )
  .withWatermark("fooTime", "2 seconds")
  .select($"fooId", $"fooTime", window($"fooTime", "2 
seconds").alias("fooWindow"))

val barStream = rateStream
  // Introduce misses for ease of debugging
  .where(col("value") % 2 === 0)
  .select(
col("value").as("barId"),
col("timestamp").as("barTime")
  )
  .withWatermark("barTime", "2 seconds")
  .select($"barId", $"barTime", window($"barTime", "2 
seconds").alias("barWindow"))

val joinedDf = fooStream
  .join(
barStream,
$"fooId" === $"barId" &&
  fooStream.col("fooWindow") === barStream.col("barWindow"),
joinType = "LeftOuter"
  )

  val query = joinedDf
  .writeStream
  .format("console")
  .option("truncate", 100)
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .start()

query.awaitTermination()
{code}

this program will end with an exception, and from the analyzed plan we can see 
there is no delayKey metadata on 'fooWindow'

{code:java}
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two 
streaming DataFrame/Datasets is not supported without a watermark in the join 
keys, or a watermark on the nullable side and an appropriate range condition;;
Join LeftOuter, ((fooId#4L = barId#14L) && (fooWindow#9 = barWindow#19))
:- Project [fooId#4L, fooTime#5-T2000ms, window#10-T2000ms AS fooWindow#9]
:  +- Filter isnotnull(fooTime#5-T2000ms)
: +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0), LongType, TimestampType), end, 
precisetimestampconversion((CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 
(cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, LongType) - 
0) as double) / cast(200 as double))) THEN 
(CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) + cast(1 as bigint)) ELSE 
CEIL((cast((precisetimestampconversion(fooTime#5-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) END + cast(0 as bigint)) 
- cast(1 as bigint)) * 200) + 0) + 200), LongType, TimestampType)) AS 
window#10-T2000ms, fooId#4L, fooTime#5-T2000ms]
:+- EventTimeWatermark fooTime#5: timestamp, interval 2 seconds
:   +- Project [value#1L AS fooId#4L, timestamp#0 AS fooTime#5]
:  +- StreamingRelationV2 
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider@2cae5fa7, 
rate, Map(numPartitions -> 1, rowsPerSecond -> 1), [timestamp#0, value#1L]
+- Project [barId#14L, barTime#15-T2000ms, window#20-T2000ms AS barWindow#19]
   +- Filter isnotnull(barTime#15-T2000ms)
  +- Project [named_struct(start, precisetimestampconversion(CASE WHEN 
(cast(CEIL((cast((precisetimestampconversion(barTime#15-T2000ms, TimestampType, 
LongType) - 0) as double) / cast(200 as double))) as double) = 

[jira] [Commented] (SPARK-24630) SPIP: Support SQLStreaming in Spark

2019-03-28 Thread Kevin Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804535#comment-16804535
 ] 

Kevin Zhang commented on SPARK-24630:
-

Hi, is there any update about the progress?

> SPIP: Support SQLStreaming in Spark
> ---
>
> Key: SPARK-24630
> URL: https://issues.apache.org/jira/browse/SPARK-24630
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.2.0, 2.2.1
>Reporter: Jackey Lee
>Priority: Minor
>  Labels: SQLStreaming
> Attachments: SQLStreaming SPIP V2.pdf
>
>
> At present, KafkaSQL, Flink SQL(which is actually based on Calcite), 
> SQLStream, StormSQL all provide a stream type SQL interface, with which users 
> with little knowledge about streaming,  can easily develop a flow system 
> processing model. In Spark, we can also support SQL API based on 
> StructStreamig.
> To support for SQL Streaming, there are two key points: 
> 1, Analysis should be able to parse streaming type SQL. 
> 2, Analyzer should be able to map metadata information to the corresponding 
> Relation. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26350) Allow the user to override the group id of the Kafka's consumer

2018-12-13 Thread Kevin Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-26350:

Attachment: Permalink.url

> Allow the user to override the group id of the Kafka's consumer
> ---
>
> Key: SPARK-26350
> URL: https://issues.apache.org/jira/browse/SPARK-26350
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.0
>Reporter: Shixiong Zhu
>Priority: Major
> Attachments: Permalink.url
>
>
> Sometimes the group id is used to identify the stream for "security". We 
> should give a flag that lets you override it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20295) when spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue

2018-06-21 Thread Kevin Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-20295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519994#comment-16519994
 ] 

Kevin Zhang commented on SPARK-20295:
-

Still hit this bug in spark 2.3.0

> when  spark.sql.adaptive.enabled is enabled, have conflict with Exchange Resue
> --
>
> Key: SPARK-20295
> URL: https://issues.apache.org/jira/browse/SPARK-20295
> Project: Spark
>  Issue Type: Bug
>  Components: Shuffle, SQL
>Affects Versions: 2.1.0
>Reporter: Ruhui Wang
>Priority: Major
>
> when run  tpcds-q95, and set  spark.sql.adaptive.enabled = true the physical 
> plan firstly:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- Exchange(coordinator id: 3)
>  spark.sql.exchange.reuse is opened, then physical plan will become below:
> Sort
> :  +- Exchange(coordinator id: 1)
> : +- Project***
> ::-Sort **
> ::  +- Exchange(coordinator id: 2)
> :: :- Project ***
> :+- Sort
> ::  +- ReusedExchange  Exchange(coordinator id: 2)
> If spark.sql.adaptive.enabled = true,  the code stack is : 
> ShuffleExchange#doExecute --> postShuffleRDD function --> 
> doEstimationIfNecessary . In this function, 
> assert(exchanges.length == numExchanges) will be error, as left side has only 
> one element, but right is equal to 2.
> If this is a bug of spark.sql.adaptive.enabled and exchange resue?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14974) spark sql job create too many files in HDFS when doing insert overwrite hive table

2018-02-28 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380126#comment-16380126
 ] 

Kevin Zhang commented on SPARK-14974:
-

I encountered the same problem with [~ussraf] in spark 2.2 and 2.3, and I'm not 
quite sure about how to fix it. Is there any plan to reopen the issue?

> spark sql job create too many files in HDFS when doing insert overwrite hive 
> table
> --
>
> Key: SPARK-14974
> URL: https://issues.apache.org/jira/browse/SPARK-14974
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 1.5.2
>Reporter: zenglinxi
>Priority: Minor
>
> Recently, we often encounter problems using spark sql for inserting data into 
> a partition table (ex.: insert overwrite table $output_table partition(dt) 
> select xxx from tmp_table).  
> After the spark job start running on yarn, the app will create too many files 
> (ex. 2,000,000, or even 10,000,000), which will make HDFS under enormous 
> pressure.
> We found that the num of files created by spark job is depending on the 
> partition num of hive table that will be inserted and the num of spark sql 
> partitions. 
> files_num = hive_table_partions_num *  spark_sql_partitions_num.
> We often make the spark_sql_partitions_num(spark.sql.shuffle.partitions) >= 
> 1000, and the hive_table_partions_num is very small under normal 
> circumstances, but it will turn out to be more than 2000 when we input a 
> wrong field as the partion field unconsciously, which will make the files_num 
> >= 1000 * 2000 = 2,000,000.
> There is a configuration parameter in hive that can limit the maximum number 
> of dynamic partitions allowed to be created in each mapper/reducer named 
> hive.exec.max.dynamic.partitions.pernode, but this conf parameter did't work 
> when we use hiveContext.
> Reducing spark_sql_partitions_num(spark.sql.shuffle.partitions) can make the 
> files_num be smaller, but it will affect the concurrency.
> Can we create configuration parameters to  limit the maximum number of files 
> allowed to be create by each task or limit the spark_sql_partitions_num 
> without affect the concurrency?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23498) Accuracy problem in comparison with string and integer

2018-02-27 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16379753#comment-16379753
 ] 

Kevin Zhang commented on SPARK-23498:
-

yes, thanks. But when we use spark sql to run existing hive scripts we expected 
spark sql could have the same results as hive, and that's why I open this jira. 
Now that [~q79969786] has marked this as duplicated with [SPARK-21646 
|https://issues.apache.org/jira/browse/SPARK-21646], I will patch in my own 
branch first.

> Accuracy problem in comparison with string and integer
> --
>
> Key: SPARK-23498
> URL: https://issues.apache.org/jira/browse/SPARK-23498
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1, 2.3.0
>Reporter: Kevin Zhang
>Priority: Major
>
> While comparing a string column with integer value, spark sql will 
> automatically cast the string operant to int, the following sql will return 
> true in hive but false in spark
>  
> {code:java}
> select '1000.1'>1000
> {code}
>  
>  from the physical plan we can see the string operant was cast to int which 
> caused the accuracy loss
> {code:java}
> *Project [false AS (CAST(1000.1 AS INT) > 1000)#4]
> +- Scan OneRowRelation[]
> {code}
> To solve it, using a wider common type like double to cast both sides of 
> operant of a binary operator may be safe.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23498) Accuracy problem in comparison with string and integer

2018-02-23 Thread Kevin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-23498:

Description: 
While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
To solve it, using a wider common type like double to cast both sides of 
operant of a binary operator may be safe.

  was:
While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
Similar to SPARK-22469, I think it's safe to use double a common type to cast 
both side of operants to.


> Accuracy problem in comparison with string and integer
> --
>
> Key: SPARK-23498
> URL: https://issues.apache.org/jira/browse/SPARK-23498
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0, 2.2.1, 2.3.0
>Reporter: Kevin Zhang
>Priority: Major
>
> While comparing a string column with integer value, spark sql will 
> automatically cast the string operant to int, the following sql will return 
> true in hive but false in spark
>  
> {code:java}
> select '1000.1'>1000
> {code}
>  
>  from the physical plan we can see the string operant was cast to int which 
> caused the accuracy loss
> {code:java}
> *Project [false AS (CAST(1000.1 AS INT) > 1000)#4]
> +- Scan OneRowRelation[]
> {code}
> To solve it, using a wider common type like double to cast both sides of 
> operant of a binary operator may be safe.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23498) Accuracy problem in comparison with string and integer

2018-02-23 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-23498:
---

 Summary: Accuracy problem in comparison with string and integer
 Key: SPARK-23498
 URL: https://issues.apache.org/jira/browse/SPARK-23498
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.1, 2.2.0, 2.3.0
Reporter: Kevin Zhang


While comparing a string column with integer value, spark sql will 
automatically cast the string operant to int, the following sql will return 
true in hive but false in spark

 
{code:java}
select '1000.1'>1000
{code}
 

 from the physical plan we can see the string operant was cast to int which 
caused the accuracy loss
{code:java}
*Project [false AS (CAST(1000.1 AS INT) > 1000)#4]

+- Scan OneRowRelation[]
{code}
Similar to SPARK-22469, I think it's safe to use double a common type to cast 
both side of operants to.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results

2017-12-12 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288608#comment-16288608
 ] 

Kevin Zhang commented on SPARK-22755:
-

[~ksunitha] Thanks, it helps a lot

> Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return 
> different results
> -
>
> Key: SPARK-22755
> URL: https://issues.apache.org/jira/browse/SPARK-22755
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
> Fix For: 2.2.1
>
>
> both of the following sql statements
> {code:sql}
> select ((946-885)*1.000/946 < 0.1)
> {code}
> and
> {code:sql}
> select ((946-885)*1.0/946 < 0.100)
> {code}
> return true, while the following statement 
> {code:sql}
> select ((946-885)*1.0/946 < 0.1)
> {code}
> returns false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results

2017-12-11 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961
 ] 

Kevin Zhang edited comment on SPARK-22755 at 12/12/17 2:03 AM:
---

Thanks for reply. In hive and presto the result is supposed to be true. 

I tried another time in both spark sql repl and thrift-server, the statement 
`select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2 
instead of the trunk codeline


{code:sql}
+---+
|(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * 
CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) 
AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))|
+---+
|   

   false|
+---+
{code}

and I found out my physical plan is different from yours in the decimal 
precision and scale, is there any configuration influences? or the bug has been 
fixed by some issue? 



was (Author: kevinzwx):
Thanks for reply. In hive and presto the result is supposed to be true. 

I tried another time in both spark sql repl and thrift-server, the statement 
`select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, 
which version are you using?


{code:sql}
+---+
|(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * 
CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) 
AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))|
+---+
|   

   false|
+---+
{code}

and I found out my physical plan is different from yours in the decimal 
precision and scale, is there any configuration influences?


> Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return 
> different results
> -
>
> Key: SPARK-22755
> URL: https://issues.apache.org/jira/browse/SPARK-22755
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> both of the following sql statements
> {code:sql}
> select ((946-885)*1.000/946 < 0.1)
> {code}
> and
> {code:sql}
> select ((946-885)*1.0/946 < 0.100)
> {code}
> return true, while the following statement 
> {code:sql}
> select ((946-885)*1.0/946 < 0.1)
> {code}
> returns false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results

2017-12-11 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961
 ] 

Kevin Zhang edited comment on SPARK-22755 at 12/12/17 1:58 AM:
---

Thanks for reply. In hive and presto the result is supposed to be true. 

I tried another time in both spark sql repl and thrift-server, the statement 
`select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, 
which version are you using?


{code:sql}
+---+
|(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * 
CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) 
AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))|
+---+
|   

   false|
+---+
{code}

and I found out my physical plan is different from yours in the decimal 
precision and scale, is there any configuration influences?



was (Author: kevinzwx):
Thanks for reply. In hive and presto the result is supposed to be true. 

I tried another time in both spark sql repl and thrift-server, the statement 
`select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, 
which version are you using?


{code:sql}
+---+
|(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * 
CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) 
AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))|
+---+
|   

   false|
+---+
{code}


> Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return 
> different results
> -
>
> Key: SPARK-22755
> URL: https://issues.apache.org/jira/browse/SPARK-22755
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> both of the following sql statements
> {code:sql}
> select ((946-885)*1.000/946 < 0.1)
> {code}
> and
> {code:sql}
> select ((946-885)*1.0/946 < 0.100)
> {code}
> return true, while the following statement 
> {code:sql}
> select ((946-885)*1.0/946 < 0.1)
> {code}
> returns false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results

2017-12-11 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16286961#comment-16286961
 ] 

Kevin Zhang commented on SPARK-22755:
-

Thanks for reply. In hive and presto the result is supposed to be true. 

I tried another time in both spark sql repl and thrift-server, the statement 
`select ((946-885)*1.0/946 < 0.1` still returns false, and I used spark 2.2, 
which version are you using?


{code:sql}
+---+
|(CAST((CAST((CAST(CAST((946 - 885) AS DECIMAL(10,0)) AS DECIMAL(11,1)) * 
CAST(1.0 AS DECIMAL(11,1))) AS DECIMAL(13,1)) / CAST(CAST(946 AS DECIMAL(13,1)) 
AS DECIMAL(13,1))) AS DECIMAL(13,1)) < CAST(0.1 AS DECIMAL(13,1)))|
+---+
|   

   false|
+---+
{code}


> Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return 
> different results
> -
>
> Key: SPARK-22755
> URL: https://issues.apache.org/jira/browse/SPARK-22755
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> both of the following sql statements
> {code:sql}
> select ((946-885)*1.000/946 < 0.1)
> {code}
> and
> {code:sql}
> select ((946-885)*1.0/946 < 0.100)
> {code}
> return true, while the following statement 
> {code:sql}
> select ((946-885)*1.0/946 < 0.1)
> {code}
> returns false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22755) Expression (946-885)*1.0/946 < 0.1 and (946-885)*1.000/946 < 0.1 return different results

2017-12-11 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-22755:
---

 Summary: Expression (946-885)*1.0/946 < 0.1 and 
(946-885)*1.000/946 < 0.1 return different results
 Key: SPARK-22755
 URL: https://issues.apache.org/jira/browse/SPARK-22755
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.2.0
Reporter: Kevin Zhang


both of the following sql statements
{code:sql}
select ((946-885)*1.000/946 < 0.1)
{code}
and
{code:sql}
select ((946-885)*1.0/946 < 0.100)
{code}
return true, while the following statement 

{code:sql}
select ((946-885)*1.0/946 < 0.1)
{code}
returns false







--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-08 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158948#comment-16158948
 ] 

Kevin Zhang commented on SPARK-21944:
-

[~mgaido] Do you mean the following way by saying "define the watermark on the 
column 'time' "?

{code:java}
val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
  .withWatermark("time", "10 seconds")
  .dropDuplicates("id", "window")
  .groupBy("window")
  .count
{code}
I don't know whether this is right, because the documentation indicates we 
should use the same column as is used in watermark, that is "time" column(which 
is not what I want). I tried this way and the application dosen't throw any 
exception, but it didn't drop events older than the watermark as expected. In 
the following example, after the batch containing an event with 
time=1504774540(2017/9/7 16:55:40 CST) is processed(the watermark should be 
adjust to 2017/9/7 16:55:30 CST), then I send an event with 
time=1504745724(2017/9/7 8:55:24 CST), this event is processed instead of being 
dropped as expected.

{code:java}
+-+-+   
|window   |count|
+-+-+
|[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
|[2017-09-07 08:55:20.0,2017-09-07 08:55:25.0]|1|
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
+-+-+

{min=2017-09-07T00:55:24.000Z, avg=2017-09-07T00:55:24.000Z, 
watermark=2017-09-07T08:55:30.000Z, max=2017-09-07T00:55:24.000Z}
{code}

Here is one thing important I have to say, that is my time zone is CST, instead 
of UTC. The start and end time in window is right, but the watermark is 
reported in UTC. I don't know whether this influences.

If I didn't make everything clear, please point it and I will explain. Thanks




> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark 

[jira] [Comment Edited] (SPARK-21944) Watermark on window column is wrong

2017-09-07 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158055#comment-16158055
 ] 

Kevin Zhang edited comment on SPARK-21944 at 9/8/17 2:51 AM:
-

I found the problem when I use kafka source consuming our internal topics, but 
it is hard to verify what the problem is. So then I use socket source and 
produce some  data myself, like (1504774520 1), (1504774521 2), (1504774540 1),
(1504774520 4)... where the first element is timestamp, and the second element 
is id. so it's easy for you to produce sample data and reproduce the problem. 
Also the following is one group of data I used:

{code:xml}
1504774520 1
1504774521 2
1504774520 3
1504774540 1 
1504774520 4
1504774531 1
1504774532 1
1504774533 1
1504774520 1
1504774520 10
1504774526 11
{code}
 


was (Author: kevinzwx):
I found the problem when I use kafka source consuming our internal topics, but 
it is hard to verify what the problem is. So then I use socket source and 
produce some  data myself, like (1504774520 1), (1504774521 2), (1504774540 1),
(1504774520 4)... where the first element is timestamp, and the second element 
is id. so it's easy for you to produce sample data and reproduce the problem. 
Also the following is one group of data I used:

{code:shell}
1504774520 1
1504774521 2
1504774520 3
1504774540 1 
1504774520 4
1504774531 1
1504774532 1
1504774533 1
1504774520 1
1504774520 10
1504774526 11
{code}
 

> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21944) Watermark on window column is wrong

2017-09-07 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16158055#comment-16158055
 ] 

Kevin Zhang commented on SPARK-21944:
-

I found the problem when I use kafka source consuming our internal topics, but 
it is hard to verify what the problem is. So then I use socket source and 
produce some  data myself, like (1504774520 1), (1504774521 2), (1504774540 1),
(1504774520 4)... where the first element is timestamp, and the second element 
is id. so it's easy for you to produce sample data and reproduce the problem. 
Also the following is one group of data I used:

{code:shell}
1504774520 1
1504774521 2
1504774520 3
1504774540 1 
1504774520 4
1504774531 1
1504774532 1
1504774533 1
1504774520 1
1504774520 10
1504774526 11
{code}
 

> Watermark on window column is wrong
> ---
>
> Key: SPARK-21944
> URL: https://issues.apache.org/jira/browse/SPARK-21944
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.0
>Reporter: Kevin Zhang
>
> When I use a watermark with dropDuplicates in the following way, the 
> watermark is calculated wrong
> {code:java}
> val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
>   .withWatermark("window", "10 seconds")
>   .dropDuplicates("id", "window")
>   .groupBy("window")
>   .count
> {code}
> where events is a dataframe with a timestamp column "time" and long column 
> "id".
> I registered a listener to print the event time stats in each batch, and the 
> results is like the following
> {code:shell}
> ---
> Batch: 0
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> {watermark=1970-01-01T00:00:00.000Z}
> ---
> Batch: 1
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> ---
> Batch: 2
> ---
> +-+-+ 
>   
> |window   |count|
> +-+-+
> |[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
> |[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
> +-+-+
> {min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
> watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
> {watermark=1970-01-01T19:05:09.476Z}
> {code}
> As can be seen, the event time stats are wrong which are always in 
> 1970-01-01, so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21944) Watermark on window column is wrong

2017-09-07 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-21944:
---

 Summary: Watermark on window column is wrong
 Key: SPARK-21944
 URL: https://issues.apache.org/jira/browse/SPARK-21944
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0
Reporter: Kevin Zhang


When I use a watermark with dropDuplicates in the following way, the watermark 
is calculated wrong

{code:java}
val counts = events.select(window($"time", "5 seconds"), $"time", $"id")
  .withWatermark("window", "10 seconds")
  .dropDuplicates("id", "window")
  .groupBy("window")
  .count
{code}

where events is a dataframe with a timestamp column "time" and long column "id".
I registered a listener to print the event time stats in each batch, and the 
results is like the following

{code:shell}
---
Batch: 0
---
+-+-+   
|window   |count|
+-+-+
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
+-+-+

{min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
watermark=1970-01-01T00:00:00.000Z, max=1970-01-01T19:05:19.476Z}
{watermark=1970-01-01T00:00:00.000Z}
{watermark=1970-01-01T00:00:00.000Z}
{watermark=1970-01-01T00:00:00.000Z}
{watermark=1970-01-01T00:00:00.000Z}
---
Batch: 1
---
+-+-+   
|window   |count|
+-+-+
|[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|3|
+-+-+

{min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
{watermark=1970-01-01T19:05:09.476Z}
---
Batch: 2
---
+-+-+   
|window   |count|
+-+-+
|[2017-09-07 16:55:40.0,2017-09-07 16:55:45.0]|1|
|[2017-09-07 16:55:20.0,2017-09-07 16:55:25.0]|4|
+-+-+

{min=1970-01-01T19:05:19.476Z, avg=1970-01-01T19:05:19.476Z, 
watermark=1970-01-01T19:05:09.476Z, max=1970-01-01T19:05:19.476Z}
{watermark=1970-01-01T19:05:09.476Z}
{code}

As can be seen, the event time stats are wrong which are always in 1970-01-01, 
so the watermark is calculated wrong.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-09 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16120924#comment-16120924
 ] 

Kevin Zhang commented on SPARK-21590:
-

Thanks, I'd like to work on this. 

I agree the requirement that the absolute value of the start offset is less 
than the slide interval, but I don't know why we should add the slide interval 
to start offset when it's negative? I've tried in my local environment that 
currently the calculation supports negative value of start offset, it's the 
non-negative check for parameters of the window that limits. So I suggest only 
to make some changes of the check function. How do you think about it?

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 864, -288)' due 
> to data type mismatch: The start time (-288) must be greater than or 
> equal to 0.;;
> {code}
> because the time window checks the input parameters to guarantee each value 
> is greater than or equal to 0.
> So I'm thinking about whether we can remove the limit that the start time 
> cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-08 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900
 ] 

Kevin Zhang edited comment on SPARK-21590 at 8/8/17 6:15 AM:
-

[~brkyvz] Thanks for your advice, but I believe there are still something you 
are not completely clear about what I described. Let me talk about what I found 
unreasonable and that' s why I'm here opening the issue. 

Before digging into adjusting start offset of the window, I've tried using 
from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, 
but all of these cannot get the right result, because all the timestamps of my 
events are generated in CST,  if I use from_utc_timestamp to convert the 
timestamp of all events to utc time , take today (2017-08-08) as example, I 
finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 
08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 
00:00. 

Other methods are also incorrect and won't be covered again here. So the 
problem is I have the correct timestamp and can get the right date of window, 
but the only thing weird is the 8 hours start offset. That's why I want to set 
the start offset to -8 to adjust the window, and I think it's very reasonable 
because flink 
[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] 
runs exactly in this way.

As for "+16" offset, I've told that it works well, like what you described in 
your example for something happened in  2017-03-14 17:30:00 CST I can get a 
window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . 

But the confusing thing is that with "+16" why I don't get a window of 
2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a 
window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of 
window falls back to yesterday? Don't you think it's kind of confusing for 
users?


was (Author: kevinzwx):
[~brkyvz] Thanks for your advice, but I believe you don't completely understand 
the problem. Let me talk about what I found unreasonable and that' s why I'm 
here opening the issue. 

Before digging into adjusting start offset of the window, I've tried using 
from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, 
but all of these cannot get the right result, because all the timestamps of my 
events are generated in CST,  if I use from_utc_timestamp to convert the 
timestamp of all events to utc time , take today (2017-08-08) as example, I 
finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 
08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 
00:00. 

Other methods are also incorrect and won't be covered again here. So the 
problem is I have the correct timestamp and can get the right date of window, 
but the only thing weird is the 8 hours start offset. That's why I want to set 
the start offset to -8 to adjust the window, and I think it's very reasonable 
because flink 
[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] 
runs exactly in this way.

As for "+16" offset, I've told that it works well, like what you described in 
your example for something happened in  2017-03-14 17:30:00 CST I can get a 
window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . 

But the confusing thing is that with "+16" why I don't get a window of 
2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a 
window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of 
window falls back to yesterday? Don't you think it's kind of confusing for 
users?

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 

[jira] [Comment Edited] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-08 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900
 ] 

Kevin Zhang edited comment on SPARK-21590 at 8/8/17 6:05 AM:
-

[~brkyvz] Thanks for your advice, but I believe you don't completely understand 
the problem. Let me talk about what I found unreasonable and that' s why I'm 
here opening the issue. 

Before digging into adjusting start offset of the window, I've tried using 
from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, 
but all of these cannot get the right result, because all the timestamps of my 
events are generated in CST,  if I use from_utc_timestamp to convert the 
timestamp of all events to utc time , take today (2017-08-08) as example, I 
finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 
08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 
00:00. 

Other methods are also incorrect and won't be covered again here. So the 
problem is I have the correct timestamp and can get the right date of window, 
but the only thing weird is the 8 hours start offset. That's why I want to set 
the start offset to -8 to adjust the window, and I think it's very reasonable 
because flink 
[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] 
runs exactly in this way.

As for "+16" offset, I've told that it works well, like what you described in 
your example for something happened in  2017-03-14 17:30:00 CST I can get a 
window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . 

But the confusing thing is that with "+16" why I don't get a window of 
2017-03-15 00:00 CST - 2017-03-16 00:00 CST? Also I tried "+15" I'll get a 
window of 2017-03-13 23:00 CST - 2017-03-14 23:00 CST, the beginning time of 
window falls back to yesterday? Don't you think it's kind of confusing for 
users?


was (Author: kevinzwx):
[~brkyvz] Thanks for your advice, but I believe you don't completely understand 
the problem. Let me talk about what I found unreasonable and that' s why I'm 
here opening the issue. 

Before digging into adjusting start offset of the window, I've tried using 
from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, 
but all of these cannot get the right result, because all the timestamps of my 
events are generated in CST,  if I use from_utc_timestamp to convert the 
timestamp of all events to utc time , take today (2017-08-08) as example, I 
finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 
08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 
00:00. Other methods are also incorrect and won't be covered again here. So the 
problem is I have the correct timestamp and can get the right date of window, 
but the only thing weird is the 8 hours start offset. That's why I want to set 
the start offset to -8 to adjust the window, and I think it's very reasonable 
because flink 
[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] 
runs exactly in this way.

As for "+16" offset, I've told that it works well, like what you described in 
your example for something happened in  2017-03-14 17:30:00 CST I can get a 
window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing 
is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 
2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 
CST - 2017-03-14 23:00 CST, the beginning time of window falls back to 
yesterday? Don't you think it's kind of confusing for users?

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 

[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-08 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16117900#comment-16117900
 ] 

Kevin Zhang commented on SPARK-21590:
-

[~brkyvz] Thanks for your advice, but I believe you don't completely understand 
the problem. Let me talk about what I found unreasonable and that' s why I'm 
here opening the issue. 

Before digging into adjusting start offset of the window, I've tried using 
from_utc_timestamp (even to_utc_timestamp) and set the timezone of jvm to CST, 
but all of these cannot get the right result, because all the timestamps of my 
events are generated in CST,  if I use from_utc_timestamp to convert the 
timestamp of all events to utc time , take today (2017-08-08) as example, I 
finally get a window of *yesterday* starting at 8 o'clock like [2017-08-07 
08:00:00.0,2017-08-08 08:00:00.0], instead of the window of *today* starting at 
00:00. Other methods are also incorrect and won't be covered again here. So the 
problem is I have the correct timestamp and can get the right date of window, 
but the only thing weird is the 8 hours start offset. That's why I want to set 
the start offset to -8 to adjust the window, and I think it's very reasonable 
because flink 
[https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html] 
runs exactly in this way.

As for "+16" offset, I've told that it works well, like what you described in 
your example for something happened in  2017-03-14 17:30:00 CST I can get a 
window of 2017-03-14 00:00 CST - 2017-03-15 00:00 CST . But the confusing thing 
is that with "+16" why I don't get a window of 2017-03-15 00:00 CST - 
2017-03-16 00:00 CST? Also I tried "+15" I'll get a window of 2017-03-13 23:00 
CST - 2017-03-14 23:00 CST, the beginning time of window falls back to 
yesterday? Don't you think it's kind of confusing for users?

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 864, -288)' due 
> to data type mismatch: The start time (-288) must be greater than or 
> equal to 0.;;
> {code}
> because the time window checks the input parameters to guarantee each value 
> is greater than or equal to 0.
> So I'm thinking about whether we can remove the limit that the start time 
> cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-03 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112274#comment-16112274
 ] 

Kevin Zhang commented on SPARK-21590:
-

[~brkyvz] I don' t know the test you illustrated, but according to the source 
code (in org.apache.spark.sql.catalyst.expressions.TimeWindow) below and the 
logs in the description produced when I use a window like _window($"timestamp", 
"1 day", "1 day", "-8 hours")_

{code:java}
 if (startTime < 0) {
return TypeCheckFailure(s"The start time ($startTime) must be greater 
than or equal to 0.")
  }
{code}

As for using `+16 hours` instead of `-8`, I tried it and it seems everything is 
ok. But I'm a little confused about the range of the window. i.e. when I use 
window("1 day") I' ll  get a window like 2017-08-03 08:00 - 2017-08-04 08:00, 
according to my understanding, providing a '+16' start time I will have a 
window like 2017-08-04 00:00 - 2017-08-05 00:00 (that is the window of the next 
day), but I get the window like 2017-08-03 00:00 - 2017-08-04 00:00, which is 
exactly the same as providing '-8', is there something wrong?

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 864, -288)' due 
> to data type mismatch: The start time (-288) must be greater than or 
> equal to 0.;;
> {code}
> because the time window checks the input parameters to guarantee each value 
> is greater than or equal to 0.
> So I'm thinking about whether we can remove the limit that the start time 
> cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21546) dropDuplicates with watermark yields RuntimeException due to binding failure

2017-08-02 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112146#comment-16112146
 ] 

Kevin Zhang commented on SPARK-21546:
-

[~zsxwing] in my case I hope to use a watermark to expire the state but not use 
the watermark column to filter duplicate elements. i.e. I want to count the 
unique access of my website for one day, so I should just store the state of 
dropDuplicates for one day and drop the state the next day, meanwhile I want to 
use uuid as the key to drop duplicate elements rather than using (uuid, 
eventTime) together, but dropDuplicates behaves like the latter right? If so 
how can I get the right results as I expected?

> dropDuplicates with watermark yields RuntimeException due to binding failure
> 
>
> Key: SPARK-21546
> URL: https://issues.apache.org/jira/browse/SPARK-21546
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Jacek Laskowski
>Assignee: Shixiong Zhu
> Fix For: 2.2.1, 2.3.0
>
>
> With today's master...
> The following streaming query with watermark and {{dropDuplicates}} yields 
> {{RuntimeException}} due to failure in binding.
> {code}
> val topic1 = spark.
>   readStream.
>   format("kafka").
>   option("subscribe", "topic1").
>   option("kafka.bootstrap.servers", "localhost:9092").
>   option("startingoffsets", "earliest").
>   load
> val records = topic1.
>   withColumn("eventtime", 'timestamp).  // <-- just to put the right name 
> given the purpose
>   withWatermark(eventTime = "eventtime", delayThreshold = "30 seconds"). // 
> <-- use the renamed eventtime column
>   dropDuplicates("value").  // dropDuplicates will use watermark
> // only when eventTime column exists
>   // include the watermark column => internal design leak?
>   select('key cast "string", 'value cast "string", 'eventtime).
>   as[(String, String, java.sql.Timestamp)]
> scala> records.explain
> == Physical Plan ==
> *Project [cast(key#0 as string) AS key#169, cast(value#1 as string) AS 
> value#170, eventtime#157-T3ms]
> +- StreamingDeduplicate [value#1], 
> StatefulOperatorStateInfo(,93c3de98-3f85-41a4-8aef-d09caf8ea693,0,0),
>  0
>+- Exchange hashpartitioning(value#1, 200)
>   +- EventTimeWatermark eventtime#157: timestamp, interval 30 seconds
>  +- *Project [key#0, value#1, timestamp#5 AS eventtime#157]
> +- StreamingRelation kafka, [key#0, value#1, topic#2, 
> partition#3, offset#4L, timestamp#5, timestampType#6]
> import org.apache.spark.sql.streaming.{OutputMode, Trigger}
> val sq = records.
>   writeStream.
>   format("console").
>   option("truncate", false).
>   trigger(Trigger.ProcessingTime("10 seconds")).
>   queryName("from-kafka-topic1-to-console").
>   outputMode(OutputMode.Update).
>   start
> {code}
> {code}
> ---
> Batch: 0
> ---
> 17/07/27 10:28:58 ERROR Executor: Exception in task 3.0 in stage 13.0 (TID 
> 438)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding 
> attribute, tree: eventtime#157-T3ms
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>   at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
>   at 
> 

[jira] [Commented] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-01 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110209#comment-16110209
 ] 

Kevin Zhang commented on SPARK-21590:
-

I think the only problem is the non-negative check of window start time, and 
other checks are correct. So I suggest to simply remove the non-negative 
judgement of start time to solve the problem. If you think it's ok then I will 
work on this.

> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 864, -288)' due 
> to data type mismatch: The start time (-288) must be greater than or 
> equal to 0.;;
> {code}
> because the time window checks the input parameters to guarantee each value 
> is greater than or equal to 0.
> So I'm thinking about whether we can remove the limit that the start time 
> cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-01 Thread Kevin Zhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Zhang updated SPARK-21590:

Description: 
I want to calculate (unique) daily access count using structured streaming 
(2.2.0). 
Now strut streaming' s window with 1 day duration starts at 
00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
(UTC + 8 hours) and I
want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 

In Flink I can set the window offset to -8 hours to make it, but here in struct 
streaming if I set the start time (same as the offset in Flink) to -8 or any 
other negative values, I will get the following error:

{code:java}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'timewindow(timestamp, 864, 864, -288)' due to 
data type mismatch: The start time (-288) must be greater than or equal 
to 0.;;
{code}

because the time window checks the input parameters to guarantee each value is 
greater than or equal to 0.

So I'm thinking about whether we can remove the limit that the start time 
cannot be negative?

  was:
I want to calculate (unique) daily access count using structured streaming 
(2.2.0). 
Now strut streaming' s window with 1 day duration starts at 
00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
(UTC + 8 hours) and I
want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 

In Flink I can set the window offset to -8 hours to make it, but here in struct 
streaming if I set the start time (same as the offset in Flink) to -8 or any 
other negative values, I will get the following error:

{code:shell}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'timewindow(timestamp, 864, 864, -288)' due to 
data type mismatch: The start time (-288) must be greater than or equal 
to 0.;;
{code}

because the time window checks the input parameters to guarantee each value is 
greater than or equal to 0.

So I'm thinking about whether we can remove the limit that the start time 
cannot be negative?


> Structured Streaming window start time should support negative values to 
> adjust time zone
> -
>
> Key: SPARK-21590
> URL: https://issues.apache.org/jira/browse/SPARK-21590
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0
> Environment: spark 2.2.0
>Reporter: Kevin Zhang
>  Labels: spark-sql, spark2.2, streaming, structured, timezone, 
> window
>
> I want to calculate (unique) daily access count using structured streaming 
> (2.2.0). 
> Now strut streaming' s window with 1 day duration starts at 
> 00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
> (UTC + 8 hours) and I
> want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 
> In Flink I can set the window offset to -8 hours to make it, but here in 
> struct streaming if I set the start time (same as the offset in Flink) to -8 
> or any other negative values, I will get the following error:
> {code:java}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
> resolve 'timewindow(timestamp, 864, 864, -288)' due 
> to data type mismatch: The start time (-288) must be greater than or 
> equal to 0.;;
> {code}
> because the time window checks the input parameters to guarantee each value 
> is greater than or equal to 0.
> So I'm thinking about whether we can remove the limit that the start time 
> cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21590) Structured Streaming window start time should support negative values to adjust time zone

2017-08-01 Thread Kevin Zhang (JIRA)
Kevin Zhang created SPARK-21590:
---

 Summary: Structured Streaming window start time should support 
negative values to adjust time zone
 Key: SPARK-21590
 URL: https://issues.apache.org/jira/browse/SPARK-21590
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.0, 2.1.0, 2.0.1, 2.0.0
 Environment: spark 2.2.0
Reporter: Kevin Zhang


I want to calculate (unique) daily access count using structured streaming 
(2.2.0). 
Now strut streaming' s window with 1 day duration starts at 
00:00:00 UTC and ends at 23:59:59 UTC each day, but my local timezone is CST 
(UTC + 8 hours) and I
want date boundaries to be 00:00:00 CST (that is 00:00:00 UTC - 8). 

In Flink I can set the window offset to -8 hours to make it, but here in struct 
streaming if I set the start time (same as the offset in Flink) to -8 or any 
other negative values, I will get the following error:

{code:shell}
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot 
resolve 'timewindow(timestamp, 864, 864, -288)' due to 
data type mismatch: The start time (-288) must be greater than or equal 
to 0.;;
{code}

because the time window checks the input parameters to guarantee each value is 
greater than or equal to 0.

So I'm thinking about whether we can remove the limit that the start time 
cannot be negative?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12221) Add CPU time metric to TaskMetrics

2016-09-18 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15500750#comment-15500750
 ] 

Kevin Zhang edited comment on SPARK-12221 at 9/18/16 10:35 AM:
---

hi, is there any plan to put this to trunk?


was (Author: kevinzwx):
hi, is there any plan to put this to trunk??

> Add CPU time metric to TaskMetrics
> --
>
> Key: SPARK-12221
> URL: https://issues.apache.org/jira/browse/SPARK-12221
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Web UI
>Affects Versions: 1.5.2
>Reporter: Jisoo Kim
>
> Currently TaskMetrics doesn't support executor CPU time. I'd like to have one 
> so I can retrieve the metric from History Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12221) Add CPU time metric to TaskMetrics

2016-09-18 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15500750#comment-15500750
 ] 

Kevin Zhang commented on SPARK-12221:
-

hi, is there any plan to put this to trunk??

> Add CPU time metric to TaskMetrics
> --
>
> Key: SPARK-12221
> URL: https://issues.apache.org/jira/browse/SPARK-12221
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, Web UI
>Affects Versions: 1.5.2
>Reporter: Jisoo Kim
>
> Currently TaskMetrics doesn't support executor CPU time. I'd like to have one 
> so I can retrieve the metric from History Server.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13514) Spark Shuffle Service 1.6.0 issue in Yarn

2016-07-18 Thread Kevin Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15383437#comment-15383437
 ] 

Kevin Zhang commented on SPARK-13514:
-

yes file:// does exsit and this is definitely a work around. But I what I mean 
is that is there other ways to solve the problem except for removing file:// 
prefix, because in our environment  the prefix is needed for some reason.

> Spark Shuffle Service 1.6.0 issue in Yarn 
> --
>
> Key: SPARK-13514
> URL: https://issues.apache.org/jira/browse/SPARK-13514
> Project: Spark
>  Issue Type: Bug
>Reporter: Satish Kolli
>
> Spark shuffle service 1.6.0 in Yarn fails with an unknown exception. When I 
> replace the spark shuffle jar with version 1.5.2 jar file, the following 
> succeeds with out any issues.
> Hadoop Version: 2.5.1 (Kerberos Enabled)
> Spark Version: 1.6.0
> Java Version: 1.7.0_79
> {code}
> $SPARK_HOME/bin/spark-shell \
> --master yarn \
> --deploy-mode client \
> --conf spark.dynamicAllocation.enabled=true \
> --conf spark.dynamicAllocation.minExecutors=5 \
> --conf spark.yarn.executor.memoryOverhead=2048 \
> --conf spark.shuffle.service.enabled=true \
> --conf spark.scheduler.mode=FAIR \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --executor-memory 6G \
> --driver-memory 8G
> {code}
> {code}
> scala> val df = sc.parallelize(1 to 50).toDF
> df: org.apache.spark.sql.DataFrame = [_1: int]
> scala> df.show(50)
> {code}
> {code}
> 16/02/26 08:20:53 INFO spark.SparkContext: Starting job: show at :30
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Got job 0 (show at 
> :30) with 1 output partitions
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 
> (show at :30)
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Parents of final stage: List()
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Missing parents: List()
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Submitting ResultStage 0 
> (MapPartitionsRDD[2] at show at :30), which has no missing parents
> 16/02/26 08:20:53 INFO storage.MemoryStore: Block broadcast_0 stored as 
> values in memory (estimated size 2.2 KB, free 2.2 KB)
> 16/02/26 08:20:53 INFO storage.MemoryStore: Block broadcast_0_piece0 stored 
> as bytes in memory (estimated size 1411.0 B, free 3.6 KB)
> 16/02/26 08:20:53 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
> memory on 10.5.76.106:46683 (size: 1411.0 B, free: 5.5 GB)
> 16/02/26 08:20:53 INFO spark.SparkContext: Created broadcast 0 from broadcast 
> at DAGScheduler.scala:1006
> 16/02/26 08:20:53 INFO scheduler.DAGScheduler: Submitting 1 missing tasks 
> from ResultStage 0 (MapPartitionsRDD[2] at show at :30)
> 16/02/26 08:20:53 INFO cluster.YarnScheduler: Adding task set 0.0 with 1 tasks
> 16/02/26 08:20:53 INFO scheduler.FairSchedulableBuilder: Added task set 
> TaskSet_0 tasks to pool default
> 16/02/26 08:20:53 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 
> 0.0 (TID 0, , partition 0,PROCESS_LOCAL, 2031 bytes)
> 16/02/26 08:20:53 INFO cluster.YarnClientSchedulerBackend: Disabling executor 
> 2.
> 16/02/26 08:20:54 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 0)
> 16/02/26 08:20:54 INFO storage.BlockManagerMasterEndpoint: Trying to remove 
> executor 2 from BlockManagerMaster.
> 16/02/26 08:20:54 INFO storage.BlockManagerMasterEndpoint: Removing block 
> manager BlockManagerId(2, , 48113)
> 16/02/26 08:20:54 INFO storage.BlockManagerMaster: Removed 2 successfully in 
> removeExecutor
> 16/02/26 08:20:54 ERROR cluster.YarnScheduler: Lost executor 2 on 
> : Container marked as failed: 
> container_1456492687549_0001_01_03 on host: . 
> Exit status: 1. Diagnostics: Exception from container-launch: 
> ExitCodeException exitCode=1:
> ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:702)
> at 
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300)
> at 
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Container exited with a