Re: Joining multiple temporal tables
Hi Fabian, Thanks for confirming the issue and suggesting a workaround - I'll give that a try. I've created a JIRA issue as you suggested, https://issues.apache.org/jira/browse/FLINK-15112 Many thanks, Chris -- Original Message -- From: "Fabian Hueske" To: "Chris Miller" Cc: "user@flink.apache.org" Sent: 06/12/2019 14:52:16 Subject: Re: Joining multiple temporal tables Hi Chris, Your query looks OK to me. Moreover, you should get a SQLParseException (or something similar) if it wouldn't be valid SQL. Hence, I assume you are running in a bug in one of the optimizer rules. I tried to reproduce the problem on the SQL training environment and couldn't write a query that joins two temporal tables. What worked though was to first create a view of a query that joins the stream with one temporal table and then join the view with the second one. Maybe that workaround also works for you? It would be great if you could open a Jira issue for this bug including your program to reproduce the bug. Thank you, Fabian Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller : I want to decorate/enrich a stream by joining it with "lookups" to the most recent data available in other streams. For example, suppose I have a stream of product orders. For each order, I want to add price and FX rate information based on the order's product ID and order currency. Is it possible to join a table with two other temporal tables to achieve this? I'm trying but getting a NullPointerException deep inside Flink's Calcite code. I've attached some sample code that demonstrates the problem. Is my SQL incorrect/invalid (in which case Flink ideally should detect the problem and provide a better error message), or is the SQL correct and this a bug/limitation in Flink? If it's the latter, how do I achieve a similar result? The SQL I'm trying to run: SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price FROM Orders AS o, LATERAL TABLE (FxRateLookup(o.rowtime)) AS f, LATERAL TABLE (PriceLookup(o.rowtime)) AS p WHERE o.currency = f.currency AND o.productId = p.productId The exception I get: Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129) at org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228) at org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212) at org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187) at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218) at test.PojoTest.run(PojoTest.java:96) at test.PojoTest.main(PojoTest.java:23)
Joining multiple temporal tables
I want to decorate/enrich a stream by joining it with "lookups" to the most recent data available in other streams. For example, suppose I have a stream of product orders. For each order, I want to add price and FX rate information based on the order's product ID and order currency. Is it possible to join a table with two other temporal tables to achieve this? I'm trying but getting a NullPointerException deep inside Flink's Calcite code. I've attached some sample code that demonstrates the problem. Is my SQL incorrect/invalid (in which case Flink ideally should detect the problem and provide a better error message), or is the SQL correct and this a bug/limitation in Flink? If it's the latter, how do I achieve a similar result? The SQL I'm trying to run: SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price FROM Orders AS o, LATERAL TABLE (FxRateLookup(o.rowtime)) AS f, LATERAL TABLE (PriceLookup(o.rowtime)) AS p WHERE o.currency = f.currency AND o.productId = p.productId The exception I get: Exception in thread "main" java.lang.NullPointerException at org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129) at org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284) at org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202) at org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228) at org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212) at org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138) at org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61) at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187) at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227) at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218) at test.PojoTest.run(PojoTest.java:96) at test.PojoTest.main(PojoTest.java:23) FlinkTest.java Description: Binary data
Re: Need help using AggregateFunction instead of FoldFunction
I hit the same problem, as far as I can tell it should be fixed in Pulsar 2.4.2. The release of this has already passed voting so I hope it should be available in a day or two. https://github.com/apache/pulsar/pull/5068 -- Original Message -- From: "devinbost" To: user@flink.apache.org Sent: 05/12/2019 04:35:05 Subject: Re: Need help using AggregateFunction instead of FoldFunction It turns out that the exception that I was getting is actually related to Pulsar since I'm using the Pulsar Flink connector. I found the exact issue reported here: https://github.com/apache/pulsar/issues/4721 devinbost wrote I was able to make more progress (based on the documentation you provided), but now I'm getting this exception: org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not serializable. The object probably contains or references non serializable fields. org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Re: Accessing fields in a POJO stream
Thank you, adding the missing constructor has done the trick! (FWIW: my 'real' code is in Kotlin and I had a data class with no @JvmOverloads or empty secondary constructor). I haven't seen the fx.get('currency') field access syntax anywhere in the documentation, do you happen to know where I can read about that? The only thing I found was https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-field-expressions which is why I was trying the "fx.currency" style syntax instead. Many thanks for your help, Chris -- Original Message -- From: "Jingsong Lee" To: "Chris Miller" Cc: "user" Sent: 04/12/2019 03:41:05 Subject: Re: Add time attribute column to POJO stream Hi Chris, First thing, FxRate is not POJO, a POJO should have a constructor without arguments. In this way, you can read from a POJO DataStream directly. Second, if you want get field from POJO, please use get function like: fx.get('currency'), if you have a POJO field, you can use this way to get nested field from POJO. Best, Jingsong Lee On Wed, Dec 4, 2019 at 12:33 AM Chris Miller wrote: I'm having trouble dealing with a DataStream of POJOs. In particular, when I perform SQL operations on it I can't figure out the syntax for referring to individual fields within the POJO. Below is an example that illustrates the problem and the various approaches I've tried. Can anyone please point me in the right direction? import java.util.Arrays; import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; public class PojoTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); // Using tuples, this works as expected List> tupleData = Arrays.asList( new Tuple2<>("USD", 1.0), new Tuple2<>("GBP", 1.3), new Tuple2<>("EUR", 1.11)); DataStreamSource> tupleStream = streamEnv.fromCollection(tupleData); tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema(); // Using a DataStream of POJOs, how do I obtain an equivalent table to the above? List pojoData = Arrays.asList( new FxRate("USD", 1.0), new FxRate("GBP", 1.3), new FxRate("EUR", 1.11)); DataStreamSource pojoStream = streamEnv.fromCollection(pojoData); Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx"); pojoTable.printSchema(); // This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]" pojoTable.select("currency, rate").printSchema(); // This fails with "ValidationException: Undefined function: currency" pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema(); // This fails with "ValidationException: Too many fields referenced from an atomic type" tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema(); // This fails with "ValidationException: Field reference expression expected" tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema(); streamEnv.execute(); } public static class FxRate { public String currency; public double rate; public FxRate(String currency, double rate) { this.currency = currency; this.rate = rate; } @Override public String toString() { return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}'; } } } -- Best, Jingsong Lee
Add time attribute column to POJO stream
I'm having trouble dealing with a DataStream of POJOs. In particular, when I perform SQL operations on it I can't figure out the syntax for referring to individual fields within the POJO. Below is an example that illustrates the problem and the various approaches I've tried. Can anyone please point me in the right direction? import java.util.Arrays; import java.util.List; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; public class PojoTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); // Using tuples, this works as expected List> tupleData = Arrays.asList( new Tuple2<>("USD", 1.0), new Tuple2<>("GBP", 1.3), new Tuple2<>("EUR", 1.11)); DataStreamSource> tupleStream = streamEnv.fromCollection(tupleData); tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema(); // Using a DataStream of POJOs, how do I obtain an equivalent table to the above? List pojoData = Arrays.asList( new FxRate("USD", 1.0), new FxRate("GBP", 1.3), new FxRate("EUR", 1.11)); DataStreamSource pojoStream = streamEnv.fromCollection(pojoData); Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx"); pojoTable.printSchema(); // This fails with "ValidationException: Cannot resolve field [currency], input field list:[fx]" pojoTable.select("currency, rate").printSchema(); // This fails with "ValidationException: Undefined function: currency" pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema(); // This fails with "ValidationException: Too many fields referenced from an atomic type" tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema(); // This fails with "ValidationException: Field reference expression expected" tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema(); streamEnv.execute(); } public static class FxRate { public String currency; public double rate; public FxRate(String currency, double rate) { this.currency = currency; this.rate = rate; } @Override public String toString() { return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}'; } } }
Re: low performance in running queries
I haven't run any benchmarks with Flink or even used it enough to directly help with your question, however I suspect that the following article might be relevant: http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/ Given the computation you're performing is trivial, it's possible that the additional overhead of serialisation, interprocess communication, state management etc that distributed systems like Flink require are dominating the runtime here. 2 hours (or even 25 minutes) still seems too long to me however, so hopefully it really is just a configuration issue of some sort. Either way, if you do figure this out or anyone with good knowledge of the article above in relation to Flink is able to give their thoughts, I'd be very interested in hearing more. Regards, Chris -- Original Message -- From: "Habib Mostafaei" To: "Zhenghua Gao" Cc: "user" ; "Georgios Smaragdakis" ; "Niklas Semmler" Sent: 30/10/2019 12:25:28 Subject: Re: low performance in running queries Thanks Gao for the reply. I used the parallelism parameter with different values like 6 and 8 but still the execution time is not comparable with a single threaded python script. What would be the reasonable value for the parallelism? Best, Habib On 10/30/2019 1:17 PM, Zhenghua Gao wrote: The reason might be the parallelism of your task is only 1, that's too low. See [1] to specify proper parallelism for your job, and the execution time should be reduced significantly. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html Best Regards, Zhenghua Gao On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei wrote: Hi all, I am running Flink on a standalone cluster and getting very long execution time for the streaming queries like WordCount for a fixed text file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I have a text file with size of 2GB. When I run the Flink on a standalone cluster, i.e., one JobManager and one taskManager with 25GB of heapsize, it took around two hours to finish counting this file while a simple python script can do it in around 7 minutes. Just wondering what is wrong with my setup. I ran the experiments on a cluster with six taskManagers, but I still get very long execution time like 25 minutes or so. I tried to increase the JVM heap size to have lower execution time but it did not help. I attached the log file and the Flink configuration file to this email. Best, Habib -- Habib Mostafaei, Ph.D. Postdoctoral researcher TU Berlin, FG INET, MAR 4.003 Marchstraße 23, 10587 Berlin
Re: AssertionError: mismatched type $5 TIMESTAMP(3)
Hi Timo, Thanks for the pointers, bug reports and slides, much appreciated. I'll read up to get a better understanding of the issue and hopefully figure out a more appropriate solution for what I'm trying achieve. I'll report back if I come up with anything that others might find useful. Regards, Chris -- Original Message -- From: "Timo Walther" To: user@flink.apache.org Sent: 06/02/2019 16:45:26 Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) Hi Chris, the error that you've observed is a bug that might be related to another bug that is not easily solvable. I created an issue for it nevertheless: https://issues.apache.org/jira/browse/FLINK-11543 In general, I think you need to adapt your program in any case. Because you are aggregating on a rowtime attribute, it will loose its time attribute property and becomes a regular timestamp. Thus, you can't use it for a temporal table join. Maybe the following training from the last FlinkForward conference might help you. I would recommend the slide set there to understand the different between streaming operations and what we call "materializing" operations: https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams I hope this helps. Feel free to ask further questions. Regards, Timo Am 05.02.19 um 11:30 schrieb Chris Miller: Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) at test.Test.main(Test.java:78)
Re: AssertionError: mismatched type $5 TIMESTAMP(3)
Sorry to reply to my own post but I wasn't able to figure out a solution for this. Does anyone have any suggestions I could try? -- Original Message -- From: "Chris Miller" To: "Timo Walther" ; "user" Sent: 29/01/2019 10:06:47 Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) Thanks Timo, I didn't realise supplying Row could automatically apply the correct types. In this case your suggestion doesn't solve the problem though, I still get the exact same error. I assume that's because there isn't a time attribute type on the tradesByInstr table itself, but rather on the groupedTrades table that it joins with. System.out.println(tradesByInstr.getSchema().toRowType()) outputs: -> Row(InstrumentId: Integer, Name: String, ClosePrice: Double, TradeCount: Long, Quantity: Double, Cost: Double) System.out.println(groupedTrades.getSchema().toRowType()) outputs: -> Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount: Long, Quantity: Double, Cost: Double, LastTrade_EventTime: TimeIndicatorTypeInfo(rowtime)) Looking at the stack trace it seems the query optimiser is tripping up on the LastTrade_EventTime column, but that is required for the temporal table join. Any other ideas on how I can work around this problem? Many thanks, Chris -- Original Message ------ From: "Timo Walther" To: "Chris Miller" ; "user" Sent: 29/01/2019 09:44:14 Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) Hi Chris, the exception message is a bit misleading. The time attribute (time indicator) type is an internal type and should not be used by users. The following line should solve your issue. Instead of: DataStream> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, typeInfo); You can do DataStream> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, Row.class); The API will automatically insert the right types for the table passed when using a plain `Row.class`. I hope this helps. Regards, Timo Am 25.01.19 um 20:14 schrieb Chris Miller: I'm trying to group some data and then enrich it by joining with a temporal table function, however my test code (attached) is failing with the error shown below. Can someone please give me a clue as to what I'm doing wrong? Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) at test.Test.main(Test.java:78)
Re: AssertionError: mismatched type $5 TIMESTAMP(3)
Thanks Timo, I didn't realise supplying Row could automatically apply the correct types. In this case your suggestion doesn't solve the problem though, I still get the exact same error. I assume that's because there isn't a time attribute type on the tradesByInstr table itself, but rather on the groupedTrades table that it joins with. System.out.println(tradesByInstr.getSchema().toRowType()) outputs: -> Row(InstrumentId: Integer, Name: String, ClosePrice: Double, TradeCount: Long, Quantity: Double, Cost: Double) System.out.println(groupedTrades.getSchema().toRowType()) outputs: -> Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount: Long, Quantity: Double, Cost: Double, LastTrade_EventTime: TimeIndicatorTypeInfo(rowtime)) Looking at the stack trace it seems the query optimiser is tripping up on the LastTrade_EventTime column, but that is required for the temporal table join. Any other ideas on how I can work around this problem? Many thanks, Chris -- Original Message -- From: "Timo Walther" To: "Chris Miller" ; "user" Sent: 29/01/2019 09:44:14 Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3) Hi Chris, the exception message is a bit misleading. The time attribute (time indicator) type is an internal type and should not be used by users. The following line should solve your issue. Instead of: DataStream> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, typeInfo); You can do DataStream> tradesByInstrStream = tableEnv.toRetractStream(tradesByInstr, Row.class); The API will automatically insert the right types for the table passed when using a plain `Row.class`. I hope this helps. Regards, Timo Am 25.01.19 um 20:14 schrieb Chris Miller: I'm trying to group some data and then enrich it by joining with a temporal table function, however my test code (attached) is failing with the error shown below. Can someone please give me a clue as to what I'm doing wrong? Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) at test.Test.main(Test.java:78)
AssertionError: mismatched type $5 TIMESTAMP(3)
I'm trying to group some data and then enrich it by joining with a temporal table function, however my test code (attached) is failing with the error shown below. Can someone please give me a clue as to what I'm doing wrong? Exception in thread "main" java.lang.AssertionError: mismatched type $5 TIMESTAMP(3) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481) at org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459) at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112) at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100) at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34) at org.apache.calcite.rex.RexCall.accept(RexCall.java:107) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279) at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241) at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259) at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605) at org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230) at org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344) at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339) at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374) at org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292) at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812) at org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340) at org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272) at test.Test.main(Test.java:78) Test.java Description: Binary data
Re: Temporal tables not behaving as expected
Hi Fabian, I was investigating this further today and was just coming to the same conclusion! Thanks for the confirmation, I'll make the suggested changes and see where that gets me. Originally I had assumed processing time for the rates table would be set when the rates are first produced by DelayedSource, but I guess that concept is closer to ingestion time than processing time. My takeaway from this is that it's never a good idea to be comparing two different processing time fields since it's not clear which of the two gets assigned first. Regards, Chris -- Original Message -- From: "Fabian Hueske" To: "Chris Miller" Cc: "user" Sent: 22/01/2019 11:23:23 Subject: Re: Temporal tables not behaving as expected Hi, The problem is that you are using processing time which is non-deterministic. Both inputs are consumed at the same time and joined based on which record arrived first. The result depends on a race condition. If you change the input table to have event time attributes and use these to register the time-versioned table, the results should become stable. Best, Fabian Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller : Hi all, I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong? import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.types.Row; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); List> rateData = Arrays.asList( new Tuple2<>("GBP", 1.29), new Tuple2<>("EUR", 1.14), new Tuple2<>("EUR", 1.15), new Tuple2<>("GBP", 1.30)); DataStreamSource> rateStream = env.addSource(new DelayedSource<>(rateData, 1L)); rateStream.returns(new TypeHint>() {}); Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime"); TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency"); tableEnv.registerFunction("FxRates", rates); List> orderData = Arrays.asList( new Tuple3<>(1, "GBP", 4.51), new Tuple3<>(2, "GBP", 23.68), new Tuple3<>(3, "EUR", 2.99), new Tuple3<>(4, "EUR", 14.76)); DataStreamSource> orderStream = env.addSource(new DelayedSource<>(orderData, 100L)); orderStream.returns(new TypeHintDouble>>() {}); Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime"); Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency") .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount"); String[] fields = usdOrders.getSchema().getFieldNames(); TypeInformation[] types = usdOrders.getSchema().getFieldTypes(); DataStream usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType()); CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE); table
Temporal tables not behaving as expected
Hi all, I'm new to Flink so am probably missing something simple. I'm using Flink 1.7.1 and am trying to use temporal table functions but aren't getting the results I expect. With the example code below, I would expect 4 records to be output (one for each order), but instead I'm only seeing a (random) subset of these records (it varies on each run). To compound my confusion further, the CSV output often shows a different subset of results than those written to the console. I assume there's a race condition of some sort but I can't figure out where it is. Any ideas what I'm doing wrong? import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.functions.TemporalTableFunction; import org.apache.flink.table.sinks.CsvTableSink; import org.apache.flink.types.Row; public class Test { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); List> rateData = Arrays.asList( new Tuple2<>("GBP", 1.29), new Tuple2<>("EUR", 1.14), new Tuple2<>("EUR", 1.15), new Tuple2<>("GBP", 1.30)); DataStreamSource> rateStream = env.addSource(new DelayedSource<>(rateData, 1L)); rateStream.returns(new TypeHint>() {}); Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, Rate, FxRates_ProcTime.proctime"); TemporalTableFunction rates = rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency"); tableEnv.registerFunction("FxRates", rates); List> orderData = Arrays.asList( new Tuple3<>(1, "GBP", 4.51), new Tuple3<>(2, "GBP", 23.68), new Tuple3<>(3, "EUR", 2.99), new Tuple3<>(4, "EUR", 14.76)); DataStreamSource> orderStream = env.addSource(new DelayedSource<>(orderData, 100L)); orderStream.returns(new TypeHint>() {}); Table orders = tableEnv.fromDataStream(orderStream, "OrderId, o_Currency, Amount, Order_ProcTime.proctime"); Table usdOrders = orders.join(new Table(tableEnv, "FxRates(Order_ProcTime)"), "o_Currency = Currency") .select("OrderId, Amount, Currency, Rate, (Amount * Rate) as UsdAmount"); String[] fields = usdOrders.getSchema().getFieldNames(); TypeInformation[] types = usdOrders.getSchema().getFieldTypes(); DataStream usdStream = tableEnv.toAppendStream(usdOrders, usdOrders.getSchema().toRowType()); CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", ",", 1, FileSystem.WriteMode.OVERWRITE); tableEnv.registerTableSink("csvSink", fields, types, csvTableSink); usdOrders.insertInto("csvSink"); usdStream.addSink(new PrintSink()); env.execute(); System.out.println("Test completed at " + time()); } public static String time() { return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME); } private static class DelayedSource extends RichSourceFunction { private final List data; private final long initialDelay; private volatile boolean shutdown; private DelayedSource(List data, long initialDelay) { this.data = data; this.initialDelay = initialDelay; } @Override public void run(SourceContext ctx) throws Exception { Iterator iterator = data.iterator(); Thread.sleep(initialDelay); while (!shutdown && iterator.hasNext()) { T next = iterator.next(); System.out.println(time() + " - producing " + next); ctx.collect(next); } } @Override public void cancel() { shutdown = true; } } private static class PrintSink extends RichSinkFunction { @Override public void invoke(Row value, Context context) { Integer orderId = (Integer) value.getField(0); Double amount = (Double) value.getField(1); String currency = (String) value.getField(2); Double rate = (Double) value.getField(3); Double usdAmount = (Double) value.getField(4); System.out.println(time() + " - order " + orderId + " was
Where are the TaskManagers IPs and Ports stored?
Hi, I'm looking for the property file where IP and Port of TaskManagers are stored in Flink. Does anyone know where it is located and when it's updated? (And for the case that there should not be such a file, where does the JobManager and TaskManagers take this information from?) Thanks. Chris
Partitions vs. Subpartitions
Hi, what's the difference between partitions and subpartitions? Thanks. CM