[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-08-14 Thread Aydin Kocas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580440#comment-16580440
 ] 

Aydin Kocas commented on SPARK-23337:
-

[~marmbrus] Can you give a hint how to do it with "withColumn" in java?

 
Dataset jsonRow = spark.readStream()
.schema(...)
.json(..).withColumn("createTime", ?? );.withWatermark("createTime", "10 
minutes"); 

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
> 

[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-04-10 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433222#comment-16433222
 ] 

Michael Armbrust commented on SPARK-23337:
--

The checkpoint will only grow if you are doing an aggregation, otherwise the 
watermark will not affect computation.

You can set a watermark on the nested column, you just need to project it to a 
top level column using {{withColumn}}

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperator

[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-04-10 Thread Aydin Kocas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432200#comment-16432200
 ] 

Aydin Kocas commented on SPARK-23337:
-

Hi Michael, in my case it's a blocking issue and unfortunately not just 
annoying because I can't use the watermarking-functionality when doing the 
readstream on a json file . Without the time limitation via the 
watermarking-functionality, I have concerns that my checkpoint-dir will 
increase with time because of not having any time boundaries.

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalP

[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-02-16 Thread Michael Armbrust (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367933#comment-16367933
 ] 

Michael Armbrust commented on SPARK-23337:
--

This is essentially the same issue as SPARK-18084. We are taking a column name 
here, not an expression.  As such you can only reference top level columns.  I 
agree this is an annoying aspect of the API, but changing it might have to 
happen at a major release since it would be change in behavior.

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> org.apache.spark