[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580440#comment-16580440 ] Aydin Kocas commented on SPARK-23337: - [~marmbrus] Can you give a hint how to do it with "withColumn" in java? Dataset jsonRow = spark.readStream() .schema(...) .json(..).withColumn("createTime", ?? );.withWatermark("createTime", "10 minutes"); > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) >
[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16433222#comment-16433222 ] Michael Armbrust commented on SPARK-23337: -- The checkpoint will only grow if you are doing an aggregation, otherwise the watermark will not affect computation. You can set a watermark on the nested column, you just need to project it to a top level column using {{withColumn}} > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperator
[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432200#comment-16432200 ] Aydin Kocas commented on SPARK-23337: - Hi Michael, in my case it's a blocking issue and unfortunately not just annoying because I can't use the watermarking-functionality when doing the readstream on a json file . Without the time limitation via the watermarking-functionality, I have concerns that my checkpoint-dir will increase with time because of not having any time boundaries. > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalP
[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16367933#comment-16367933 ] Michael Armbrust commented on SPARK-23337: -- This is essentially the same issue as SPARK-18084. We are taking a column name here, not an expression. As such you can only reference top level columns. I agree this is an annoying aspect of the API, but changing it might have to happen at a major release since it would be change in behavior. > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark