Re: Joining multiple temporal tables

2019-12-06 Thread Chris Miller

Hi Fabian,

Thanks for confirming the issue and suggesting a workaround - I'll give 
that a try. I've created a JIRA issue as you suggested, 
https://issues.apache.org/jira/browse/FLINK-15112


Many thanks,
Chris


-- Original Message --
From: "Fabian Hueske" 
To: "Chris Miller" 
Cc: "user@flink.apache.org" 
Sent: 06/12/2019 14:52:16
Subject: Re: Joining multiple temporal tables


Hi Chris,

Your query looks OK to me.
Moreover, you should get a SQLParseException (or something similar) if 
it wouldn't be valid SQL.


Hence, I assume you are running in a bug in one of the optimizer rules.
I tried to reproduce the problem on the SQL training environment and 
couldn't write a query that joins two temporal tables.
What worked though was to first create a view of a query that joins the 
stream with one temporal table and then join the view with the second 
one.

Maybe that workaround also works for you?

It would be great if you could open a Jira issue for this bug including 
your program to reproduce the bug.


Thank you,
Fabian

Am Do., 5. Dez. 2019 um 16:47 Uhr schrieb Chris Miller 
:
I want to decorate/enrich a stream by joining it with "lookups" to the 
most recent data available in other streams. For example, suppose I 
have a stream of product orders. For each order, I want to add price 
and FX rate information based on the order's product ID and order 
currency.


Is it possible to join a table with two other temporal tables to 
achieve this? I'm trying but getting a NullPointerException deep 
inside Flink's Calcite code. I've attached some sample code that 
demonstrates the problem. Is my SQL incorrect/invalid (in which case 
Flink ideally should detect the problem and provide a better error 
message), or is the SQL correct and this a bug/limitation in Flink? If 
it's the latter, how do I achieve a similar result?


The SQL I'm trying to run:
SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
FROM Orders AS o,
LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
LATERAL TABLE (PriceLookup(o.rowtime)) AS p
WHERE o.currency = f.currency
AND o.productId = p.productId
The exception I get:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at 
org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at 
org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
at 
org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at 
scala.collection.TraversableLike.map$(TraversableLike.scala:226)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)

at test.PojoTest.run(PojoTest.java:96)
at test.PojoTest.main(PojoTest.java:23)

Joining multiple temporal tables

2019-12-05 Thread Chris Miller
I want to decorate/enrich a stream by joining it with "lookups" to the 
most recent data available in other streams. For example, suppose I have 
a stream of product orders. For each order, I want to add price and FX 
rate information based on the order's product ID and order currency.


Is it possible to join a table with two other temporal tables to achieve 
this? I'm trying but getting a NullPointerException deep inside Flink's 
Calcite code. I've attached some sample code that demonstrates the 
problem. Is my SQL incorrect/invalid (in which case Flink ideally should 
detect the problem and provide a better error message), or is the SQL 
correct and this a bug/limitation in Flink? If it's the latter, how do I 
achieve a similar result?


The SQL I'm trying to run:
SELECT o.id AS orderId, o.productId, o.currency, o.quantity, f.rate, p.price
FROM Orders AS o,
LATERAL TABLE (FxRateLookup(o.rowtime)) AS f,
LATERAL TABLE (PriceLookup(o.rowtime)) AS p
WHERE o.currency = f.currency
AND o.productId = p.productId
The exception I get:

Exception in thread "main" java.lang.NullPointerException
at 
org.apache.flink.table.calcite.FlinkRelBuilder$.of(FlinkRelBuilder.scala:129)
at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:91)
at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:284)
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
at 
org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
at 
org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
at 
org.apache.flink.table.plan.Optimizer.runHepPlanner(Optimizer.scala:228)
at 
org.apache.flink.table.plan.Optimizer.runHepPlannerSimultaneously(Optimizer.scala:212)
at 
org.apache.flink.table.plan.Optimizer.optimizeExpandPlan(Optimizer.scala:138)
at 
org.apache.flink.table.plan.StreamOptimizer.optimize(StreamOptimizer.scala:61)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:410)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:187)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:127)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:127)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:218)

at test.PojoTest.run(PojoTest.java:96)
at test.PojoTest.main(PojoTest.java:23)

FlinkTest.java
Description: Binary data


Re: Need help using AggregateFunction instead of FoldFunction

2019-12-05 Thread Chris Miller
I hit the same problem, as far as I can tell it should be fixed in 
Pulsar 2.4.2. The release of this has already passed voting so I hope it 
should be available in a day or two.


https://github.com/apache/pulsar/pull/5068


-- Original Message --
From: "devinbost" 
To: user@flink.apache.org
Sent: 05/12/2019 04:35:05
Subject: Re: Need help using AggregateFunction instead of FoldFunction


It turns out that the exception that I was getting is actually related to
Pulsar since I'm using the Pulsar Flink connector. I found the exact issue
reported here: https://github.com/apache/pulsar/issues/4721


devinbost wrote

 I was able to make more progress (based on the documentation you
 provided),
 but now I'm getting this exception:

 org.apache.pulsar.client.impl.DefaultBatcherBuilder@3b5fad2d is not
 serializable. The object probably contains or references non serializable
 fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Accessing fields in a POJO stream

2019-12-04 Thread Chris Miller
Thank you, adding the missing constructor has done the trick! (FWIW: my 
'real' code is in Kotlin and I had a data class with no @JvmOverloads or 
empty secondary constructor).


I haven't seen the fx.get('currency') field access syntax anywhere in 
the documentation, do you happen to know where I can read about that? 
The only thing I found was 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-field-expressions 
which is why I was trying the "fx.currency" style syntax instead.


Many thanks for your help,
Chris


-- Original Message --
From: "Jingsong Lee" 
To: "Chris Miller" 
Cc: "user" 
Sent: 04/12/2019 03:41:05
Subject: Re: Add time attribute column to POJO stream


Hi Chris,

First thing, FxRate is not POJO, a POJO should have a constructor 
without arguments. In this way, you can read from a POJO DataStream 
directly.


Second, if you want get field from POJO, please use get function like: 
fx.get('currency'), if you have a POJO field, you can use this way to 
get nested field from POJO.


Best,
Jingsong Lee


On Wed, Dec 4, 2019 at 12:33 AM Chris Miller  
wrote:
I'm having trouble dealing with a DataStream of POJOs. In particular, 
when I perform SQL operations on it I can't figure out the syntax for 
referring to individual fields within the POJO.


Below is an example that illustrates the problem and the various 
approaches I've tried. Can anyone please point me in the right 
direction?


import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource> tupleStream = 
streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the 
above?
List pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input 
field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic 
type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
  }

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
  }
}



--
Best, Jingsong Lee

Add time attribute column to POJO stream

2019-12-03 Thread Chris Miller
I'm having trouble dealing with a DataStream of POJOs. In particular, 
when I perform SQL operations on it I can't figure out the syntax for 
referring to individual fields within the POJO.


Below is an example that illustrates the problem and the various 
approaches I've tried. Can anyone please point me in the right 
direction?


import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource> tupleStream = 
streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the 
above?
List pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input 
field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic 
type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
  }

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
  }
}

Re: low performance in running queries

2019-10-30 Thread Chris Miller
I haven't run any benchmarks with Flink or even used it enough to 
directly help with your question, however I suspect that the following 
article might be relevant:


http://dsrg.pdos.csail.mit.edu/2016/06/26/scalability-cost/

Given the computation you're performing is trivial, it's possible that 
the additional overhead of serialisation, interprocess communication, 
state management etc that distributed systems like Flink require are 
dominating the runtime here. 2 hours (or even 25 minutes) still seems 
too long to me however, so hopefully it really is just a configuration 
issue of some sort. Either way, if you do figure this out or anyone with 
good knowledge of the article above in relation to Flink is able to give 
their thoughts, I'd be very interested in hearing more.


Regards,
Chris


-- Original Message --
From: "Habib Mostafaei" 
To: "Zhenghua Gao" 
Cc: "user" ; "Georgios Smaragdakis" 
; "Niklas Semmler" 


Sent: 30/10/2019 12:25:28
Subject: Re: low performance in running queries

Thanks Gao for the reply. I used the parallelism parameter with 
different values like 6 and 8 but still the execution time is not 
comparable with a single threaded python script. What would be the 
reasonable value for the parallelism?


Best,

Habib

On 10/30/2019 1:17 PM, Zhenghua Gao wrote:
The reason might be the parallelism of your task is only 1, that's too 
low.
See [1] to specify proper parallelism  for your job, and the execution 
time should be reduced significantly.


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/parallel.html


Best Regards,
Zhenghua Gao


On Tue, Oct 29, 2019 at 9:27 PM Habib Mostafaei 
 wrote:

Hi all,

I am running Flink on a standalone cluster and getting very long
execution time for the streaming queries like WordCount for a fixed 
text

file. My VM runs on a Debian 10 with 16 cpu cores and 32GB of RAM. I
have a text file with size of 2GB. When I run the Flink on a 
standalone
cluster, i.e., one JobManager and one taskManager with 25GB of 
heapsize,

it took around two hours to finish counting this file while a simple
python script can do it in around 7 minutes. Just wondering what is
wrong with my setup. I ran the experiments on a cluster with six
taskManagers, but I still get very long execution time like 25 
minutes

or so. I tried to increase the JVM heap size to have lower execution
time but it did not help. I attached the log file and the Flink
configuration file to this email.

Best,

Habib


--
Habib Mostafaei, Ph.D.
Postdoctoral researcher
TU Berlin,
FG INET, MAR 4.003
Marchstraße 23, 10587 Berlin

Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-02-06 Thread Chris Miller

Hi Timo,

Thanks for the pointers, bug reports and slides, much appreciated. I'll 
read up to get a better understanding of the issue and hopefully figure 
out a more appropriate solution for what I'm trying achieve. I'll report 
back if I come up with anything that others might find useful.


Regards,
Chris


-- Original Message --
From: "Timo Walther" 
To: user@flink.apache.org
Sent: 06/02/2019 16:45:26
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)


Hi Chris,

the error that you've observed is a bug that might be related to another bug 
that is not easily solvable.

I created an issue for it nevertheless: 
https://issues.apache.org/jira/browse/FLINK-11543

In general, I think you need to adapt your program in any case. Because you are 
aggregating on a rowtime attribute, it will loose its time attribute property 
and becomes a regular timestamp. Thus, you can't use it for a temporal table 
join.

Maybe the following training from the last FlinkForward conference might help you. I 
would recommend the slide set there to understand the different between streaming 
operations and what we call "materializing" operations:

https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams

I hope this helps. Feel free to ask further questions.

Regards,
Timo

Am 05.02.19 um 11:30 schrieb Chris Miller:

Exception in thread "main" java.lang.AssertionError: mismatched type $5 
TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
at test.Test.main(Test.java:78)







Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-02-05 Thread Chris Miller
Sorry to reply to my own post but I wasn't able to figure out a solution 
for this. Does anyone have any suggestions I could try?



-- Original Message --
From: "Chris Miller" 
To: "Timo Walther" ; "user" 
Sent: 29/01/2019 10:06:47
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)

Thanks Timo, I didn't realise supplying Row could automatically apply 
the correct types. In this case your suggestion doesn't solve the 
problem though, I still get the exact same error. I assume that's 
because there isn't a time attribute type on the tradesByInstr table 
itself, but rather on the groupedTrades table that it joins with.


System.out.println(tradesByInstr.getSchema().toRowType()) outputs:
->  Row(InstrumentId: Integer, Name: String, ClosePrice: Double, 
TradeCount: Long, Quantity: Double, Cost: Double)


System.out.println(groupedTrades.getSchema().toRowType()) outputs:
->  Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount: 
Long, Quantity: Double, Cost: Double, LastTrade_EventTime: 
TimeIndicatorTypeInfo(rowtime))


Looking at the stack trace it seems the query optimiser is tripping up 
on the LastTrade_EventTime column, but that is required for the 
temporal table join.


Any other ideas on how I can work around this problem?

Many thanks,
Chris

-- Original Message ------
From: "Timo Walther" 
To: "Chris Miller" ; "user" 
Sent: 29/01/2019 09:44:14
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)


Hi Chris,

the exception message is a bit misleading. The time attribute (time 
indicator) type is an internal type and should not be used by users.


The following line should solve your issue. Instead of:

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, typeInfo);


You can do

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, Row.class);


The API will automatically insert the right types for the table passed 
when using a plain `Row.class`.


I hope this helps.

Regards,
Timo



Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing 
with the error shown below. Can someone please give me a clue as to 
what I'm doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)
at 
org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at 
org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at 
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
at 
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)

at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)

at test.Test.main(Test.java:78)




Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-01-29 Thread Chris Miller
Thanks Timo, I didn't realise supplying Row could automatically apply 
the correct types. In this case your suggestion doesn't solve the 
problem though, I still get the exact same error. I assume that's 
because there isn't a time attribute type on the tradesByInstr table 
itself, but rather on the groupedTrades table that it joins with.


System.out.println(tradesByInstr.getSchema().toRowType()) outputs:
->  Row(InstrumentId: Integer, Name: String, ClosePrice: Double, 
TradeCount: Long, Quantity: Double, Cost: Double)


System.out.println(groupedTrades.getSchema().toRowType()) outputs:
->  Row(t_InstrumentId: Integer, t_CounterpartyId: Integer, TradeCount: 
Long, Quantity: Double, Cost: Double, LastTrade_EventTime: 
TimeIndicatorTypeInfo(rowtime))


Looking at the stack trace it seems the query optimiser is tripping up 
on the LastTrade_EventTime column, but that is required for the temporal 
table join.


Any other ideas on how I can work around this problem?

Many thanks,
Chris

-- Original Message --
From: "Timo Walther" 
To: "Chris Miller" ; "user" 
Sent: 29/01/2019 09:44:14
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)


Hi Chris,

the exception message is a bit misleading. The time attribute (time 
indicator) type is an internal type and should not be used by users.


The following line should solve your issue. Instead of:

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, typeInfo);


You can do

DataStream> tradesByInstrStream = 
tableEnv.toRetractStream(tradesByInstr, Row.class);


The API will automatically insert the right types for the table passed 
when using a plain `Row.class`.


I hope this helps.

Regards,
Timo



Am 25.01.19 um 20:14 schrieb Chris Miller:
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing 
with the error shown below. Can someone please give me a clue as to 
what I'm doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at 
org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at 
org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)

at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)

at test.Test.main(Test.java:78)




AssertionError: mismatched type $5 TIMESTAMP(3)

2019-01-25 Thread Chris Miller
I'm trying to group some data and then enrich it by joining with a 
temporal table function, however my test code (attached) is failing with 
the error shown below. Can someone please give me a clue as to what I'm 
doing wrong?


Exception in thread "main" java.lang.AssertionError: mismatched type $5 
TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)

at test.Test.main(Test.java:78)

Test.java
Description: Binary data


Re: Temporal tables not behaving as expected

2019-01-22 Thread Chris Miller

Hi Fabian,

I was investigating this further today and was just coming to the same 
conclusion! Thanks for the confirmation, I'll make the suggested changes 
and see where that gets me. Originally I had assumed processing time for 
the rates table would be set when the rates are first produced by 
DelayedSource, but I guess that concept is closer to ingestion time than 
processing time. My takeaway from this is that it's never a good idea to 
be comparing two different processing time fields since it's not clear 
which of the two gets assigned first.


Regards,
Chris

-- Original Message --
From: "Fabian Hueske" 
To: "Chris Miller" 
Cc: "user" 
Sent: 22/01/2019 11:23:23
Subject: Re: Temporal tables not behaving as expected


Hi,

The problem is that you are using processing time which is 
non-deterministic.
Both inputs are consumed at the same time and joined based on which 
record arrived first. The result depends on a race condition.


If you change the input table to have event time attributes and use 
these to register the time-versioned table, the results should become 
stable.


Best, Fabian

Am Mo., 21. Jan. 2019 um 15:45 Uhr schrieb Chris Miller 
:

Hi all,

I'm new to Flink so am probably missing something simple. I'm using 
Flink 1.7.1 and am trying to use temporal table functions but aren't 
getting the results I expect. With the example code below, I would 
expect 4 records to be output (one for each order), but instead I'm 
only seeing a (random) subset of these records (it varies on each 
run). To compound my confusion further, the CSV output often shows a 
different subset of results than those written to the console. I 
assume there's a race condition of some sort but I can't figure out 
where it is. Any ideas what I'm doing wrong?



import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);


List> rateData = Arrays.asList(
new Tuple2<>("GBP", 1.29),
new Tuple2<>("EUR", 1.14),
new Tuple2<>("EUR", 1.15),
new Tuple2<>("GBP", 1.30));
DataStreamSource> rateStream = 
env.addSource(new DelayedSource<>(rateData, 1L));

rateStream.returns(new TypeHint>() {});

Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, 
Rate, FxRates_ProcTime.proctime");
TemporalTableFunction rates = 
rateHistory.createTemporalTableFunction("FxRates_ProcTime", 
"Currency");

tableEnv.registerFunction("FxRates", rates);

List> orderData = Arrays.asList(
new Tuple3<>(1, "GBP", 4.51),
new Tuple3<>(2, "GBP", 23.68),
new Tuple3<>(3, "EUR", 2.99),
new Tuple3<>(4, "EUR", 14.76));

DataStreamSource> orderStream = 
env.addSource(new DelayedSource<>(orderData, 100L));
orderStream.returns(new TypeHintDouble>>() {});


Table orders = tableEnv.fromDataStream(orderStream, "OrderId, 
o_Currency, Amount, Order_ProcTime.proctime");
Table usdOrders = orders.join(new Table(tableEnv, 
"FxRates(Order_ProcTime)"), "o_Currency = Currency")
.select("OrderId, Amount, Currency, Rate, 
(Amount * Rate) as UsdAmount");


String[] fields = usdOrders.getSchema().getFieldNames();
TypeInformation[] types = 
usdOrders.getSchema().getFieldTypes();
DataStream usdStream = tableEnv.toAppendStream(usdOrders, 
usdOrders.getSchema().toRowType());
CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", 
",", 1, FileSystem.WriteMode.OVERWRITE);
table

Temporal tables not behaving as expected

2019-01-21 Thread Chris Miller

Hi all,

I'm new to Flink so am probably missing something simple. I'm using 
Flink 1.7.1 and am trying to use temporal table functions but aren't 
getting the results I expect. With the example code below, I would 
expect 4 records to be output (one for each order), but instead I'm only 
seeing a (random) subset of these records (it varies on each run). To 
compound my confusion further, the CSV output often shows a different 
subset of results than those written to the console. I assume there's a 
race condition of some sort but I can't figure out where it is. Any 
ideas what I'm doing wrong?



import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.types.Row;

public class Test {
  public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env);


List> rateData = Arrays.asList(
new Tuple2<>("GBP", 1.29),
new Tuple2<>("EUR", 1.14),
new Tuple2<>("EUR", 1.15),
new Tuple2<>("GBP", 1.30));
DataStreamSource> rateStream = 
env.addSource(new DelayedSource<>(rateData, 1L));

rateStream.returns(new TypeHint>() {});

Table rateHistory = tableEnv.fromDataStream(rateStream, "Currency, 
Rate, FxRates_ProcTime.proctime");
TemporalTableFunction rates = 
rateHistory.createTemporalTableFunction("FxRates_ProcTime", "Currency");

tableEnv.registerFunction("FxRates", rates);

List> orderData = Arrays.asList(
new Tuple3<>(1, "GBP", 4.51),
new Tuple3<>(2, "GBP", 23.68),
new Tuple3<>(3, "EUR", 2.99),
new Tuple3<>(4, "EUR", 14.76));

DataStreamSource> orderStream = 
env.addSource(new DelayedSource<>(orderData, 100L));
orderStream.returns(new TypeHint>() 
{});


Table orders = tableEnv.fromDataStream(orderStream, "OrderId, 
o_Currency, Amount, Order_ProcTime.proctime");
Table usdOrders = orders.join(new Table(tableEnv, 
"FxRates(Order_ProcTime)"), "o_Currency = Currency")
.select("OrderId, Amount, Currency, Rate, 
(Amount * Rate) as UsdAmount");


String[] fields = usdOrders.getSchema().getFieldNames();
TypeInformation[] types = usdOrders.getSchema().getFieldTypes();
DataStream usdStream = tableEnv.toAppendStream(usdOrders, 
usdOrders.getSchema().toRowType());
CsvTableSink csvTableSink = new CsvTableSink("C:\\tmp\\test.csv", 
",", 1, FileSystem.WriteMode.OVERWRITE);

tableEnv.registerTableSink("csvSink", fields, types, csvTableSink);
usdOrders.insertInto("csvSink");
usdStream.addSink(new PrintSink());
env.execute();
System.out.println("Test completed at " + time());
  }

  public static String time() {
return LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_TIME);
  }

  private static class DelayedSource extends RichSourceFunction {
private final List data;
private final long initialDelay;
private volatile boolean shutdown;

private DelayedSource(List data, long initialDelay) {
  this.data = data;
  this.initialDelay = initialDelay;
}

@Override
public void run(SourceContext ctx) throws Exception {
  Iterator iterator = data.iterator();
  Thread.sleep(initialDelay);
  while (!shutdown && iterator.hasNext()) {
T next = iterator.next();
System.out.println(time() + " - producing " + next);
ctx.collect(next);
  }
}

@Override
public void cancel() {
  shutdown = true;
}
  }

  private static class PrintSink extends RichSinkFunction {
@Override
public void invoke(Row value, Context context) {
  Integer orderId = (Integer) value.getField(0);
  Double amount = (Double) value.getField(1);
  String currency = (String) value.getField(2);
  Double rate = (Double) value.getField(3);
  Double usdAmount = (Double) value.getField(4);
  System.out.println(time() + " - order " + orderId + " was 

Where are the TaskManagers IPs and Ports stored?

2018-10-15 Thread Chris Miller
 

Hi, 

I'm looking for the property file where IP and Port of TaskManagers are
stored in Flink.
Does anyone know where it is located and when it's updated? 

(And for the case that there should not be such a file, where does the
JobManager and TaskManagers take this information from?) 

Thanks. 

Chris 
 

Partitions vs. Subpartitions

2018-10-10 Thread Chris Miller
 

Hi, 

what's the difference between partitions and subpartitions? 

Thanks. 

CM