Re: Joining multiple temporal tables

2019-12-06 Thread BenoƮt Paris
Hi all!

I believe this is a duplicate of another JIRA:
https://issues.apache.org/jira/browse/FLINK-14200; where the query side
does not accept a Table, only a TableSource (or has planner rule issues). I
think in this case, the Logical Correlate extracted from the Temporal Table
join transforms one join into a computed Table; which cannot be fed into
another Temporal Table join.

What leads me to believe this is that the stack trace. Chris Miller is
using the legacy planner IMO (please correct me), and it throws through here
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToTemporalTableJoinRule.scala#L91>.
In Blink, files have changed a bit, but I believe this is the equivalent
line
<https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala#L98>,
which is *also* where 14200 goes through:
val relBuilder = FlinkRelBuilder.of(cluster, leftNode.getTable)

Same code lines in FlinkRelBuilder, when changes over time are accounted
for: Both problems seem to come from the same NPE:  leftNode.getTable is
null when getRelOptSchema is called on it, here:
  def of(cluster: RelOptCluster, relTable: RelOptTable): FlinkRelBuilder = {
val clusterContext = cluster.getPlanner.getContext
new FlinkRelBuilder(
  clusterContext,
  cluster,
  relTable.getRelOptSchema)
  }

The other parts of the stacktrace are very similar, also.

I hope this helps!
Cheers








On Fri, Dec 6, 2019 at 4:13 PM Fabian Hueske  wrote:

> Thank you!
> Please let me know if the workaround works for you.
>
> Best, Fabian
>
> Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller  >:
>
>> Hi Fabian,
>>
>> Thanks for confirming the issue and suggesting a workaround - I'll give
>> that a try. I've created a JIRA issue as you suggested,
>> https://issues.apache.org/jira/browse/FLINK-15112
>>
>> Many thanks,
>> Chris
>>
>>
>> -- Original Message --
>> From: "Fabian Hueske" 
>> To: "Chris Miller" 
>> Cc: "user@flink.apache.org" 
>> Sent: 06/12/2019 14:52:16
>> Subject: Re: Joining multiple temporal tables
>>
>> Hi Chris,
>>
>> Your query looks OK to me.
>> Moreover, you should get a SQLParseException (or something similar) if it
>> wouldn't be valid SQL.
>>
>> Hence, I assume you are running in a bug in one of the optimizer rules.
>> I tried to reproduce the problem on the SQL training environment and
>> couldn't write a query that joins two temporal tables.
>> What worked though was to first create a view of a query that joins the
>> stream with one temporal table and then join the view with the second one.
>> Maybe that workaround also works for you?
>>
>> It would be great if you could open a Jira issue for this bug including
>> your program to reproduce the bug.
>>
>> Thank you,
>> Fabian
>>
>> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller <
>> chris...@gmail.com>:
>>
>>> I want to decorate/enrich a stream by joining it with "lookups" to the
>>> most recent data available in other streams. For example, suppose I have a
>>> stream of product orders. For each order, I want to add price and FX rate
>>> information based on the order's product ID and order currency.
>>>
>>> Is it possible to join a table with two other temporal tables to achieve
>>> this? I'm trying but getting a NullPointerException deep inside Flink's
>>> Calcite code. I've attached some sample code that demonstrates the problem.
>>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>>> problem and provide a better error message), or is the SQL correct and this
>>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>>> result?
>>>
>>> The SQL I'm trying to run:
>>>
>>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>>   FROM Orders AS o,
>>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>>   WHERE o.currency = f.currency
>>>   AND o.productId = p.productId
>>>
>>> The exception I get:
>>>
>>> Exception in thread "main" java.lang.NullPointerException
>>> at
>>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>>> at
>>> org.apache.flink.table.

Re: Joining multiple temporal tables

2019-12-06 Thread Kurt Young
Hi Chris,

If you only interest the latest data of the dimension table, maybe you can
try
the temporal table join:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#operations
see "Join with Temporal Table"

Best,
Kurt


On Fri, Dec 6, 2019 at 11:13 PM Fabian Hueske  wrote:

> Thank you!
> Please let me know if the workaround works for you.
>
> Best, Fabian
>
> Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller  >:
>
>> Hi Fabian,
>>
>> Thanks for confirming the issue and suggesting a workaround - I'll give
>> that a try. I've created a JIRA issue as you suggested,
>> https://issues.apache.org/jira/browse/FLINK-15112
>>
>> Many thanks,
>> Chris
>>
>>
>> -- Original Message --
>> From: "Fabian Hueske" 
>> To: "Chris Miller" 
>> Cc: "user@flink.apache.org" 
>> Sent: 06/12/2019 14:52:16
>> Subject: Re: Joining multiple temporal tables
>>
>> Hi Chris,
>>
>> Your query looks OK to me.
>> Moreover, you should get a SQLParseException (or something similar) if it
>> wouldn't be valid SQL.
>>
>> Hence, I assume you are running in a bug in one of the optimizer rules.
>> I tried to reproduce the problem on the SQL training environment and
>> couldn't write a query that joins two temporal tables.
>> What worked though was to first create a view of a query that joins the
>> stream with one temporal table and then join the view with the second one.
>> Maybe that workaround also works for you?
>>
>> It would be great if you could open a Jira issue for this bug including
>> your program to reproduce the bug.
>>
>> Thank you,
>> Fabian
>>
>> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller <
>> chris...@gmail.com>:
>>
>>> I want to decorate/enrich a stream by joining it with "lookups" to the
>>> most recent data available in other streams. For example, suppose I have a
>>> stream of product orders. For each order, I want to add price and FX rate
>>> information based on the order's product ID and order currency.
>>>
>>> Is it possible to join a table with two other temporal tables to achieve
>>> this? I'm trying but getting a NullPointerException deep inside Flink's
>>> Calcite code. I've attached some sample code that demonstrates the problem.
>>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>>> problem and provide a better error message), or is the SQL correct and this
>>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>>> result?
>>>
>>> The SQL I'm trying to run:
>>>
>>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>>   FROM Orders AS o,
>>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>>   WHERE o.currency = f.currency
>>>   AND o.productId = p.productId
>>>
>>> The exception I get:
>>>
>>> Exception in thread "main" java.lang.NullPointerException
>>> at
>>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>>> at
>>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>>> at
>>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>>> at
>>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>>> at
>>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>>> at
>>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>>> at
>>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>>> at
>>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>>> at
>>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>>> at
>>&

Re: Joining multiple temporal tables

2019-12-06 Thread Fabian Hueske
Thank you!
Please let me know if the workaround works for you.

Best, Fabian

Am Fr., 6. Dez. 2019 um 16:11 Uhr schrieb Chris Miller :

> Hi Fabian,
>
> Thanks for confirming the issue and suggesting a workaround - I'll give
> that a try. I've created a JIRA issue as you suggested,
> https://issues.apache.org/jira/browse/FLINK-15112
>
> Many thanks,
> Chris
>
>
> -- Original Message --
> From: "Fabian Hueske" 
> To: "Chris Miller" 
> Cc: "user@flink.apache.org" 
> Sent: 06/12/2019 14:52:16
> Subject: Re: Joining multiple temporal tables
>
> Hi Chris,
>
> Your query looks OK to me.
> Moreover, you should get a SQLParseException (or something similar) if it
> wouldn't be valid SQL.
>
> Hence, I assume you are running in a bug in one of the optimizer rules.
> I tried to reproduce the problem on the SQL training environment and
> couldn't write a query that joins two temporal tables.
> What worked though was to first create a view of a query that joins the
> stream with one temporal table and then join the view with the second one.
> Maybe that workaround also works for you?
>
> It would be great if you could open a Jira issue for this bug including
> your program to reproduce the bug.
>
> Thank you,
> Fabian
>
> Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller  >:
>
>> I want to decorate/enrich a stream by joining it with "lookups" to the
>> most recent data available in other streams. For example, suppose I have a
>> stream of product orders. For each order, I want to add price and FX rate
>> information based on the order's product ID and order currency.
>>
>> Is it possible to join a table with two other temporal tables to achieve
>> this? I'm trying but getting a NullPointerException deep inside Flink's
>> Calcite code. I've attached some sample code that demonstrates the problem.
>> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
>> problem and provide a better error message), or is the SQL correct and this
>> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
>> result?
>>
>> The SQL I'm trying to run:
>>
>> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>>   FROM Orders AS o,
>>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>>   WHERE o.currency = f.currency
>>   AND o.productId = p.productId
>>
>> The exception I get:
>>
>> Exception in thread "main" java.lang.NullPointerException
>> at
>> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
>> at
>> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
>> at
>> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
>> at
>> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>> at
>> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>> at
>> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
>> at
>> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
>> at
>> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
>> at
>> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
>> at
>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
>> at
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
>> at
>> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLi

Re: Joining multiple temporal tables

2019-12-06 Thread Chris Miller

Hi Fabian,

Thanks for confirming the issue and suggesting a workaround - I'll give 
that a try. I've created a JIRA issue as you suggested, 
https://issues.apache.org/jira/browse/FLINK-15112


Many thanks,
Chris


-- Original Message --
From: "Fabian Hueske" 
To: "Chris Miller" 
Cc: "user@flink.apache.org" 
Sent: 06/12/2019 14:52:16
Subject: Re: Joining multiple temporal tables


Hi Chris,

Your query looks OK to me.
Moreover, you should get a SQLParseException (or something similar) if 
it wouldn't be valid SQL.


Hence, I assume you are running in a bug in one of the optimizer rules.
I tried to reproduce the problem on the SQL training environment and 
couldn't write a query that joins two temporal tables.
What worked though was to first create a view of a query that joins the 
stream with one temporal table and then join the view with the second 
one.

Maybe that workaround also works for you?

It would be great if you could open a Jira issue for this bug including 
your program to reproduce the bug.


Thank you,
Fabian

Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller 
:
I want to decorate/enrich a stream by joining it with "lookups" to the 
most recent data available in other streams. For example, suppose I 
have a stream of product orders. For each order, I want to add price 
and FX rate information based on the order's product ID and order 
currency.


Is it possible to join a table with two other temporal tables to 
achieve this? I'm trying but getting a NullPointerException deep 
inside Flink's Calcite code. I've attached some sample code that 
demonstrates the problem. Is my SQL incorrect/invalid (in which case 
Flink ideally should detect the problem and provide a better error 
message), or is the SQL correct and this a bug/limitation in Flink? If 
it's the latter, how do I achieve a similar result?


The SQL I'm trying to run:
SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
FROM Orders AS o,
LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
LATERAL TABLE (PriceLookup(o.rowtime)) AS p
WHERE o.currency = f.currency
AND o.productId = p.productId
The exception I get:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at 
org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at 
org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
at 
org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)

at test.PojoTest.run(PojoTest.java:96)
at test.PojoTest.main(PojoTest.java:23)

Re: Joining multiple temporal tables

2019-12-06 Thread Fabian Hueske
Hi Chris,

Your query looks OK to me.
Moreover, you should get a SQLParseException (or something similar) if it
wouldn't be valid SQL.

Hence, I assume you are running in a bug in one of the optimizer rules.
I tried to reproduce the problem on the SQL training environment and
couldn't write a query that joins two temporal tables.
What worked though was to first create a view of a query that joins the
stream with one temporal table and then join the view with the second one.
Maybe that workaround also works for you?

It would be great if you could open a Jira issue for this bug including
your program to reproduce the bug.

Thank you,
Fabian

Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller :

> I want to decorate/enrich a stream by joining it with "lookups" to the
> most recent data available in other streams. For example, suppose I have a
> stream of product orders. For each order, I want to add price and FX rate
> information based on the order's product ID and order currency.
>
> Is it possible to join a table with two other temporal tables to achieve
> this? I'm trying but getting a NullPointerException deep inside Flink's
> Calcite code. I've attached some sample code that demonstrates the problem.
> Is my SQL incorrect/invalid (in which case Flink ideally should detect the
> problem and provide a better error message), or is the SQL correct and this
> a bug/limitation in Flink? If it's the latter, how do I achieve a similar
> result?
>
> The SQL I'm trying to run:
>
> SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
>   FROM Orders AS o,
>   LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
>   LATERAL TABLE (PriceLookup(o.rowtime)) AS p
>   WHERE o.currency = f.currency
>   AND o.productId = p.productId
>
> The exception I get:
>
> Exception in thread "main" java.lang.NullPointerException
> at
> org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
> at
> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
> at
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
> at
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
> at
> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
> at
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
> at
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
> at
> org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
> at
> org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
> at
> org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
> at
> org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
> at
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)
> at test.PojoTest.run(PojoTest.java:96)
> at test.PojoTest.main(PojoTest.java:23)
>


Joining multiple temporal tables

2019-12-05 Thread Chris Miller
I want to decorate/enrich a stream by joining it with "lookups" to the 
most recent data available in other streams. For example, suppose I have 
a stream of product orders. For each order, I want to add price and FX 
rate information based on the order's product ID and order currency.


Is it possible to join a table with two other temporal tables to achieve 
this? I'm trying but getting a NullPointerException deep inside Flink's 
Calcite code. I've attached some sample code that demonstrates the 
problem. Is my SQL incorrect/invalid (in which case Flink ideally should 
detect the problem and provide a better error message), or is the SQL 
correct and this a bug/limitation in Flink? If it's the latter, how do I 
achieve a similar result?


The SQL I'm trying to run:
SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
FROM Orders AS o,
LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
LATERAL TABLE (PriceLookup(o.rowtime)) AS p
WHERE o.currency = f.currency
AND o.productId = p.productId
The exception I get:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at 
org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at 
org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
at 
org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)

at test.PojoTest.run(PojoTest.java:96)
at test.PojoTest.main(PojoTest.java:23)

FlinkTest.java
Description: Binary data