Hi Fabian,
Thanks for confirming the issue and suggesting a workaround - I'll give
that a try. I've created a JIRA issue as you suggested,
https://issues.apache.org/jira/browse/FLINK-15112
Many thanks,
Chris
------ Original Message ------
From: "Fabian Hueske" <[email protected]>
To: "Chris Miller" <[email protected]>
Cc: "[email protected]" <[email protected]>
Sent: 06/12/2019 14:52:16
Subject: Re: Joining multiple temporal tables
Hi Chris,
Your query looks OK to me.
Moreover, you should get a SQLParseException (or something similar) if
it wouldn't be valid SQL.
Hence, I assume you are running in a bug in one of the optimizer rules.
I tried to reproduce the problem on the SQL training environment and
couldn't write a query that joins two temporal tables.
What worked though was to first create a view of a query that joins the
stream with one temporal table and then join the view with the second
one.
Maybe that workaround also works for you?
It would be great if you could open a Jira issue for this bug including
your program to reproduce the bug.
Thank you,
Fabian
Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller
<[email protected]>:
I want to decorate/enrich a stream by joining it with "lookups" to the
most recent data available in other streams. For example, suppose I
have a stream of product orders. For each order, I want to add price
and FX rate information based on the order's product ID and order
currency.
Is it possible to join a table with two other temporal tables to
achieve this? I'm trying but getting a NullPointerException deep
inside Flink's Calcite code. I've attached some sample code that
demonstrates the problem. Is my SQL incorrect/invalid (in which case
Flink ideally should detect the problem and provide a better error
message), or is the SQL correct and this a bug/limitation in Flink? If
it's the latter, how do I achieve a similar result?
The SQL I'm trying to run:
SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
FROM Orders AS o,
LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
LATERAL TABLE (PriceLookup(o.rowtime)) AS p
WHERE o.currency = f.currency
AND o.productId = p.productId
The exception I get:
Exception in thread "main" java.lang.NullPointerException
at
org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
at
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at
org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at
org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
at
org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
at
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
at
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
at
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
at test.PojoTest.run(PojoTest.java:96)
at test.PojoTest.main(PojoTest.java:23)