Re: Allowed lateness in Flink SQL

2021-08-10 Thread Maciej Bryński
Thanks Ingo.
I will look into CURRENT_WATERMARK.

About allow-lateness. I can imagine the following situation.
Let's have a left interval join between two streams.
I want to wait 15 minutes for events from the right stream to arrive
(this can be done by setting a watermark on the right stream).
Also I want to update the join result if events from the right stream
are more than 15 minutes late.  (this can be possibly handled by
allow-lateness).

Regards,
Maciek



wt., 10 sie 2021 o 15:38 Ingo Bürk  napisał(a):
>
> Hi Maciej,
>
> there is no documentation for it (besides in the code itself) because it's an 
> experimental flag. What would you expect allow-lateness to do outside the 
> context of a window? Maybe you'd also be interested in 
> CURRENT_WATERMARK()[1][2] which will be released with 1.14 and allows some 
> level of late data handling.
>
> [1] https://issues.apache.org/jira/browse/FLINK-22737
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#temporal-functions
>
>
> Best
> Ingo
>
> On Tue, Aug 10, 2021 at 3:21 PM Maciej Bryński  wrote:
>>
>> Hi Guys,
>> I was checking if anything changed recently with allowed lateness
>> support in Flink SQL and I found this PR:
>> https://github.com/apache/flink/pull/16022
>>
>> Is there any documentation for table.exec.emit.allow-lateness ?
>> Is this option working only in window agregation?
>>
>> Regards,
>> --
>> Maciek Bryński



-- 
Maciek Bryński


Allowed lateness in Flink SQL

2021-08-10 Thread Maciej Bryński
Hi Guys,
I was checking if anything changed recently with allowed lateness
support in Flink SQL and I found this PR:
https://github.com/apache/flink/pull/16022

Is there any documentation for table.exec.emit.allow-lateness ?
Is this option working only in window agregation?

Regards,
-- 
Maciek Bryński


Re: Production Grade GitOps Based Kubernetes Setup

2021-08-06 Thread Maciej Bryński
Hi Niklas,
We had the same problem one year ago and we choose Ververica Platform
Community Edttion.
Pros:
- support for jobs on Session Clusters
- good support for restoring jobs from checkpoints and savepoints
- support for even hundreds of jobs
Cons:
- state in SQLite (we've already corrupted db file once)
- delay with Flink Versions

One year later I still think there is no perfect solution for managing
Flink on K8s, but for us Ververica was the closest match.

Regards,
Maciek

pt., 6 sie 2021 o 13:49 Niklas Wilcke  napisał(a):
>
> Hi Flink Community,
>
> I'm currently assessing the situation about how to properly deploy Flink on 
> Kubernetes via GitOps. There are some options available to deploy Flink on 
> Kubernetes, which I would like to discuss.  In general we are looking for an 
> open source or at least unpaid solution, but I don't exclude paid solutions 
> from the beginning.
> I see the following options.
>
> 1. Kubernetes Standalone [1]
> * Seems to be deprecated, since the docs state to use Native Kubernetes 
> instead
> 2. Native Kubernetes [2]
> * Doesn't seem to implement the Kubernetes operator pattern
> * Seems to require command line activities to be operated / upgraded (not 
> GitOps compatible out of the box)
> 3. "GoogleCloudPlatform/flink-on-k8s-operator" Operator [3]
> * Seems not to be well maintained / documented
> * We had some trouble with crashes during configuration changes, but we need 
> to investigate further
> * There is a "maintained" fork from spotify, which could be an option
> 4. Flink Native Kubernetes Operator [4]
> * Seems to be a private project from a Flink Committer, which might not be 
> mature enough for a stable operation
> 5. Proprietary Solution Ververica Platform [5]
> * I didn't try it out yet and have no experience with it
> * I'm unsure whether the Community Edition is suited for a production 
> environment. (one namespace, no auto scaling, no RBAC, etc.)
>
> I have the following questions.
>
> 1. Is the "Native Kubernetes" approach suited to be operated via Gitops and 
> does it have some drawbacks compared to an operator based setup? (e.g. is a 
> rollback during a failed upgrade possible?)
> 2. Are there any experiences with the 
> "GoogleCloudPlatform/flink-on-k8s-operator" or a fork of it in a production 
> environment?
> 3. Is the "Flink Native Kubernetes Operator" an option or is it just a 
> playground project. How is it related to the "Native Kubernetes" setup? Is it 
> going to be "integrated" into Flink?
> 4. Is a proprietary unpaid solution like "Ververica Platform Community 
> Edition" a solution for a production environment or will it definitely lack 
> features I need?
>
> Any information or feedback is highly appreciated. Thank you very much in 
> advance.
>
> Kind Regards,
> Niklas Wilcke
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/kubernetes/
> [2] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/
> [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [4] https://github.com/wangyang0918/flink-native-k8s-operator
> [5] https://www.ververica.com/getting-started-flink-ververica
>
>
>
>
> UNIBERG GmbH
> Simon-von-Utrecht-Straße 85a
> 20359 Hamburg
>
> niklas.wil...@uniberg.com
> Mobile: +49 160 9793 2593
> Office: +49 40 2380 6523
>
>
> UNIBERG GmbH, Dorfstraße 3, 23816 Bebensee
>
> Registergericht / Register: Amtsgericht Kiel HRB SE-1507
> Geschäftsführer / CEO‘s: Andreas Möller, Martin Ulbricht
>
> Informationen zum Datenschutz / Privacy Information: 
> https://www.uniberg.com/impressum.html
>


-- 
Maciek Bryński


Re: Dead Letter Queue for JdbcSink

2021-08-03 Thread Maciej Bryński
gt; I've tried several, a naive wrapper approach that I attempted looked 
> > > something like this:
> > >
> > > ```
> > > class DlqWrapper(private val sink: SinkFunction, val parameters: 
> > > ParameterTool): SinkFunction {
> > > private val logger = LoggerFactory.getLogger(DlqSink::class.java)
> > > private val dlqSink: SinkFunction = ...
> > >
> > > override fun invoke(value: T, context: SinkFunction.Context) {
> > > try {
> > > sink.invoke(value, context)
> > > }
> > > catch (ex: Exception) {
> > > logger.error("Encountered sink exception. Sending message to 
> > > dead letter queue. Value: $value. Exception: ${ex.message}")
> > > val payload = Gson().toJsonTree(value).asJsonObject
> > > payload.addProperty("exception", ex.message)
> > >
> > > dlqSink.invoke("$payload", context)
> > > }
> > > }
> > > }
> > > ```
> > >
> > > After doing this, it doesn't look like when the invoke calls are made 
> > > that it's actually attempting to perform the JDBC calls to insert the 
> > > records into those sources. I'm not entirely sure if this is related 
> > > specifically for how the JdbcSink is wrapped (via the GenericJdbcSink, 
> > > etc.).
> > >
> > > I had seen several posts around involving the use of an 
> > > InvocationHandler/Proxy, etc. but I'm not sure if that should be 
> > > necessary for handling this type of functionality. Any 
> > > ideas/thoughts/examples would be greatly appreciated.
> > >
> > > Thanks,
> > >
> > > Rion
> > >
> > > On 2021/07/14 15:47:18, Maciej Bryński  wrote:
> > > > This is the idea.
> > > > Of course you need to wrap more functions like: open, close,
> > > > notifyCheckpointComplete, snapshotState, initializeState and
> > > > setRuntimeContext.
> > > >
> > > > The problem is that if you want to catch problematic record you need
> > > > to set batch size to 1, which gives very bad performance.
> > > >
> > > > Regards,
> > > > Maciek
> > > >
> > > > śr., 14 lip 2021 o 17:31 Rion Williams  
> > > > napisał(a):
> > > > >
> > > > > Hi Maciej,
> > > > >
> > > > > Thanks for the quick response. I wasn't aware of the idea of using a 
> > > > > SinkWrapper, but I'm not quite certain that it would suit this 
> > > > > specific use case (as a SinkFunction / RichSinkFunction doesn't 
> > > > > appear to support side-outputs). Essentially, what I'd hope to 
> > > > > accomplish would be to pick up when a bad record could not be written 
> > > > > to the sink and then offload that via a side-output somewhere else.
> > > > >
> > > > > Something like this, which is a very, very naive idea:
> > > > >
> > > > > class PostgresSinkWrapper(private val sink: SinkFunction): 
> > > > > RichSinkFunction() {
> > > > > private val logger = 
> > > > > LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
> > > > >
> > > > > override fun invoke(value: T, context: SinkFunction.Context) {
> > > > > try {
> > > > > sink.invoke(value, context)
> > > > > }
> > > > > catch (exception: Exception){
> > > > > logger.error("Encountered a bad record, offloading to 
> > > > > dead-letter-queue")
> > > > > // Offload bad record to DLQ
> > > > > }
> > > > > }
> > > > > }
> > > > >
> > > > > But I think that's basically the gist of it. I'm just not sure how I 
> > > > > could go about doing this aside from perhaps writing a custom process 
> > > > > function that wraps another sink function (or just completely 
> > > > > rewriting my own JdbcSink?)
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Rion
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński  
> > > > > wrote:
> > > > >>
> > > > >> Hi Rion,
> > > > >> We have implemented such a solution with Sink Wrapper.
> > > > >>
> > > > >>
> > > > >> Regards,
> > > > >> Maciek
> > > > >>
> > > > >> śr., 14 lip 2021 o 16:21 Rion Williams  
> > > > >> napisał(a):
> > > > >> >
> > > > >> > Hi all,
> > > > >> >
> > > > >> > Recently I've been encountering an issue where some external 
> > > > >> > dependencies or process causes writes within my JDBCSink to fail 
> > > > >> > (e.g. something is being inserted with an explicit constraint that 
> > > > >> > never made it's way there). I'm trying to see if there's a pattern 
> > > > >> > or recommendation for handling this similar to a dead-letter queue.
> > > > >> >
> > > > >> > Basically - if I experience a given number of failures (> max 
> > > > >> > retry attempts) when writing to my JDBC destination, I'd like to 
> > > > >> > take the record that was attempted and throw it into a Kafka topic 
> > > > >> > or some other destination so that it can be evaluated at a later 
> > > > >> > time.
> > > > >> >
> > > > >> > Are there any well defined patterns or recommended approaches 
> > > > >> > around this?
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Rion
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Maciek Bryński
> > > >
> > > >
> > > >
> > > > --
> > > > Maciek Bryński
> > > >
> >



-- 
Maciek Bryński


Re: How can I declare BigDecimal precision and scale in user-defined aggregate function?

2021-07-29 Thread Maciej Bryński
Hi,
You can do sth like this:

/**
 * UDF implementing Power function with Decimal
 */
public class PowerFunction extends ScalarFunction {

public static MathContext mc = new MathContext(18);

public @DataTypeHint("DECIMAL(38,18)") BigDecimal
eval(@DataTypeHint("DECIMAL(38,18)")  BigDecimal x,

@DataTypeHint("DECIMAL(38,18)")  BigDecimal y) {
return BigDecimalMath.pow(x, y, mc);
}
}

pt., 30 lip 2021 o 05:12 LIU Xiao  napisał(a):
>
> sorry for a little error, the program code should be:
>
> package poc.flink.table;
>
> import org.apache.flink.api.common.eventtime.WatermarkStrategy;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.tuple.Tuple1;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.table.annotation.DataTypeHint;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.functions.AggregateFunction;
> import org.apache.flink.types.Row;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.math.BigDecimal;
> import java.sql.Timestamp;
>
> import static org.apache.flink.table.api.Expressions.$;
>
> public class PocLastDecimalJob {
> private static final Logger LOGGER = 
> LoggerFactory.getLogger(PocLastDecimalJob.class);
>
> public static class LastDecimalAccumulator extends Tuple1 {
> public LastDecimalAccumulator(BigDecimal f0) {
> super(f0);
> }
> }
>
> public static class LastDecimalAggFunction extends 
> AggregateFunction {
>
> @Override
> public BigDecimal getValue(LastDecimalAccumulator accumulator) {
> return accumulator.f0;
> }
>
> @Override
> public LastDecimalAccumulator createAccumulator() {
> return new LastDecimalAccumulator(null);
> }
>
> public void accumulate(LastDecimalAccumulator accumulator,
>@DataTypeHint("DECIMAL(38, 18)") BigDecimal 
> value) {
> if (value != null) {
> accumulator.f0 = value;
> }
> }
>
> public void merge(LastDecimalAccumulator accumulator, 
> Iterable iterable) {
> if (iterable != null) {
> for (BigDecimal item : iterable) {
> accumulator.f0 = item;
> }
> }
> }
> }
>
> public static void main(String[] args) throws Exception {
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>
> RowTypeInfo rowTypeInfo = new RowTypeInfo(
> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, 
> Types.BIG_DEC},
> new String[] {"rowtime", "id", "val"});
>
> DataStream dataStream = env.fromElements(
> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
> 
> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);
>
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new 
> LastDecimalAggFunction());
>
> tableEnv.createTemporaryView("InputTable", dataStream, 
> $("rowtime").rowtime(), $("id"), $("val"));
>
> Table resultTable = tableEnv.sqlQuery("" +
> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
> "FROM InputTable " +
> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' 
> SECOND), id");
>
> DataStream resultStream = tableEnv
> .toRetractStream(resultTable, new 
> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
> .map((MapFunction, Row>) value -> 
> value.f1);
>
> resultStream.addSink(new SinkFunction() {
> @Override
> public void invoke(Row value, Context context) {
> LOGGER.info("SinkFunction.invoke(): value={}", value);
> }
> });
>
> env.execute();
> }
> }
>
>
> LIU Xiao  于2021年7月30日周五 上午11:04写道:
>>
>> I'm currently converting our old code (based on Flink 1.6) to Flink 1.13 and 
>> encountered a strange problem about the user-defined aggregate function 
>> which takes BigDecimal as the parameter and output:
>>
>>> Exception in thread "main" 

Re: Dead Letter Queue for JdbcSink

2021-07-14 Thread Maciej Bryński
This is the idea.
Of course you need to wrap more functions like: open, close,
notifyCheckpointComplete, snapshotState, initializeState and
setRuntimeContext.

The problem is that if you want to catch problematic record you need
to set batch size to 1, which gives very bad performance.

Regards,
Maciek

śr., 14 lip 2021 o 17:31 Rion Williams  napisał(a):
>
> Hi Maciej,
>
> Thanks for the quick response. I wasn't aware of the idea of using a 
> SinkWrapper, but I'm not quite certain that it would suit this specific use 
> case (as a SinkFunction / RichSinkFunction doesn't appear to support 
> side-outputs). Essentially, what I'd hope to accomplish would be to pick up 
> when a bad record could not be written to the sink and then offload that via 
> a side-output somewhere else.
>
> Something like this, which is a very, very naive idea:
>
> class PostgresSinkWrapper(private val sink: SinkFunction): 
> RichSinkFunction() {
> private val logger = 
> LoggerFactory.getLogger(PostgresSinkWrapper::class.java)
>
> override fun invoke(value: T, context: SinkFunction.Context) {
> try {
> sink.invoke(value, context)
> }
> catch (exception: Exception){
> logger.error("Encountered a bad record, offloading to 
> dead-letter-queue")
> // Offload bad record to DLQ
> }
> }
> }
>
> But I think that's basically the gist of it. I'm just not sure how I could go 
> about doing this aside from perhaps writing a custom process function that 
> wraps another sink function (or just completely rewriting my own JdbcSink?)
>
> Thanks,
>
> Rion
>
>
>
>
>
> On Wed, Jul 14, 2021 at 9:56 AM Maciej Bryński  wrote:
>>
>> Hi Rion,
>> We have implemented such a solution with Sink Wrapper.
>>
>>
>> Regards,
>> Maciek
>>
>> śr., 14 lip 2021 o 16:21 Rion Williams  napisał(a):
>> >
>> > Hi all,
>> >
>> > Recently I've been encountering an issue where some external dependencies 
>> > or process causes writes within my JDBCSink to fail (e.g. something is 
>> > being inserted with an explicit constraint that never made it's way 
>> > there). I'm trying to see if there's a pattern or recommendation for 
>> > handling this similar to a dead-letter queue.
>> >
>> > Basically - if I experience a given number of failures (> max retry 
>> > attempts) when writing to my JDBC destination, I'd like to take the record 
>> > that was attempted and throw it into a Kafka topic or some other 
>> > destination so that it can be evaluated at a later time.
>> >
>> > Are there any well defined patterns or recommended approaches around this?
>> >
>> > Thanks,
>> >
>> > Rion
>>
>>
>>
>> --
>> Maciek Bryński



-- 
Maciek Bryński


Re: Dead Letter Queue for JdbcSink

2021-07-14 Thread Maciej Bryński
Hi Rion,
We have implemented such a solution with Sink Wrapper.


Regards,
Maciek

śr., 14 lip 2021 o 16:21 Rion Williams  napisał(a):
>
> Hi all,
>
> Recently I've been encountering an issue where some external dependencies or 
> process causes writes within my JDBCSink to fail (e.g. something is being 
> inserted with an explicit constraint that never made it's way there). I'm 
> trying to see if there's a pattern or recommendation for handling this 
> similar to a dead-letter queue.
>
> Basically - if I experience a given number of failures (> max retry attempts) 
> when writing to my JDBC destination, I'd like to take the record that was 
> attempted and throw it into a Kafka topic or some other destination so that 
> it can be evaluated at a later time.
>
> Are there any well defined patterns or recommended approaches around this?
>
> Thanks,
>
> Rion



-- 
Maciek Bryński


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-10 Thread Maciej Bryński
Could you please set 2 configuration options:
- state.backend.rocksdb.predefined-options = SPINNING_DISK_OPTIMIZED_HIGH_MEM
- state.backend.rocksdb.memory.partitioned-index-filters = true

Regards,
Maciek

sob., 10 lip 2021 o 08:54 Adrian Bednarz  napisał(a):
>
> I didn’t tweak any RocksDB knobs. The only thing we did was to increase 
> managed memory to 12gb which was supposed to help RocksDB according to the 
> documentation. The rest stays at the defaults. Incremental checkpointing was 
> enabled as well but it made no difference in performance if we disabled it.
>
> On Fri, 9 Jul 2021 at 20:43, Maciej Bryński  wrote:
>>
>> Hi Adrian,
>> Could you share your state backend configuration ?
>>
>> Regards,
>> Maciek
>>
>> pt., 9 lip 2021 o 19:09 Adrian Bednarz  napisał(a):
>> >
>> > Hello,
>> >
>> > We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we 
>> > unexpectedly hit significant performance degradation when changing the 
>> > state backend to RocksDB.
>> >
>> > We performed tests with two tables: fact table TXN and dimension table 
>> > CUSTOMER with the following schemas:
>> >
>> > TXN:
>> >  |-- PROD_ID: BIGINT
>> >  |-- CUST_ID: BIGINT
>> >  |-- TYPE: BIGINT
>> >  |-- AMOUNT: BIGINT
>> >  |-- ITEMS: BIGINT
>> >  |-- TS: TIMESTAMP(3) **rowtime**
>> >  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>> >
>> > CUSTOMER:
>> >  |-- ID: BIGINT
>> >  |-- STATE: BIGINT
>> >  |-- AGE: BIGINT
>> >  |-- SCORE: DOUBLE
>> >  |-- PRIMARY KEY: ID
>> >
>> > And the following query:
>> > select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF 
>> > t.ts ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1' 
>> > SECOND)
>> >
>> > In our catalog, we reconfigured the customer table so that the watermark 
>> > is set to infinity on that side of the join. We generate data in a round 
>> > robin fashion (except for timestamp that grows with a step of 1 ms).
>> >
>> > We performed our experiments on a single c5.4xlarge machine with heap and 
>> > managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact 
>> > records and 100 000 dimension records, a job with heap backend finishes in 
>> > 5 seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension 
>> > records it doesn't grow significantly but goes up to 1h 36m (the job 
>> > processes more records after all).
>> >
>> > We also checked what would happen if we reduced the amount of customer ids 
>> > to 1. Our expectation was that RocksDB will not offload anything to disk 
>> > anymore so the performance should be comparable with heap backend. It was 
>> > executed in 10 minutes.
>> >
>> > Is this something anybody experienced or something to be expected? Of 
>> > course, we assumed RocksDB to perform slower but 300 eps is below our 
>> > expectations.
>> >
>> > Thanks,
>> > Adrian
>>
>>
>>
>> --
>> Maciek Bryński



-- 
Maciek Bryński


Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Maciej Bryński
Hi Adrian,
Could you share your state backend configuration ?

Regards,
Maciek

pt., 9 lip 2021 o 19:09 Adrian Bednarz  napisał(a):
>
> Hello,
>
> We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we 
> unexpectedly hit significant performance degradation when changing the state 
> backend to RocksDB.
>
> We performed tests with two tables: fact table TXN and dimension table 
> CUSTOMER with the following schemas:
>
> TXN:
>  |-- PROD_ID: BIGINT
>  |-- CUST_ID: BIGINT
>  |-- TYPE: BIGINT
>  |-- AMOUNT: BIGINT
>  |-- ITEMS: BIGINT
>  |-- TS: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR TS: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> CUSTOMER:
>  |-- ID: BIGINT
>  |-- STATE: BIGINT
>  |-- AGE: BIGINT
>  |-- SCORE: DOUBLE
>  |-- PRIMARY KEY: ID
>
> And the following query:
> select state, sum(amount) from txn t JOIN customer FOR SYSTEM_TIME AS OF t.ts 
> ON t.cust_id = customer.id group by state, TUMBLE(t.ts, INTERVAL '1' SECOND)
>
> In our catalog, we reconfigured the customer table so that the watermark is 
> set to infinity on that side of the join. We generate data in a round robin 
> fashion (except for timestamp that grows with a step of 1 ms).
>
> We performed our experiments on a single c5.4xlarge machine with heap and 
> managed memory size set to 12gb with a blackhole sink. With 2 000 000 fact 
> records and 100 000 dimension records, a job with heap backend finishes in 5 
> seconds whereas RocksDB executes in 1h 24m. For 400 000 dimension records it 
> doesn't grow significantly but goes up to 1h 36m (the job processes more 
> records after all).
>
> We also checked what would happen if we reduced the amount of customer ids to 
> 1. Our expectation was that RocksDB will not offload anything to disk anymore 
> so the performance should be comparable with heap backend. It was executed in 
> 10 minutes.
>
> Is this something anybody experienced or something to be expected? Of course, 
> we assumed RocksDB to perform slower but 300 eps is below our expectations.
>
> Thanks,
> Adrian



-- 
Maciek Bryński


Re:

2021-07-07 Thread Maciej Bryński
Hi Nicolaus,
I'm sending records as an attachment.

Regards,
Maciek

śr., 7 lip 2021 o 11:47 Nicolaus Weidner
 napisał(a):
>
> Hi Maciek,
>
> is there a typo in the input data? Timestamp 2021-05-01 04:42:57 appears 
> twice, but timestamp 2021-05-01T15:28:34 (from the log lines) is not there at 
> all. I find it hard to correlate the logs with the input...
>
> Best regards,
> Nico
>
> On Wed, Jul 7, 2021 at 11:16 AM Arvid Heise  wrote:
>>
>> Hi Maciek,
>>
>> could you bypass the MATCH_RECOGNIZE (=comment out) and check if the records 
>> appear in a shortcutted output?
>>
>> I suspect that they may be filtered out before (for example because of 
>> number conversion issues with 0E-18)
>>
>> On Tue, Jul 6, 2021 at 3:26 PM Maciek Bryński  wrote:
>>>
>>> Hi,
>>> I have a very strange bug when using MATCH_RECOGNIZE.
>>>
>>> I'm using some joins and unions to create event stream. Sample event stream 
>>> (for one user) looks like this:
>>>
>>> uuidcif event_type  v   balance ts
>>> 621456e9-389b-409b-aaca-bca99eeb43b30004091386  trx 
>>> 4294.38 74.524950   2021-05-01 04:42:57
>>> 7b2bc022-b069-41ca-8bbf-e93e3f0e85a70004091386  application 
>>> 0E-18   74.524950   2021-05-01 10:29:10
>>> 942cd3ce-fb3d-43d3-a69a-aaeeec5ee90e0004091386  application 
>>> 0E-18   74.524950   2021-05-01 10:39:02
>>> 433ac9bc-d395-457n-986c-19e30e375f2e0004091386  trx 
>>> 4294.38 74.524950   2021-05-01 04:42:57
>>>
>>> Then I'm using following MATCH_RECOGNIZE definition (trace function will be 
>>> explained later)
>>>
>>> CREATE VIEW scenario_1 AS (
>>> SELECT * FROM events
>>> MATCH_RECOGNIZE(
>>> PARTITION BY cif
>>> ORDER BY ts
>>> MEASURES
>>> TRX.v as trx_amount,
>>> TRX.ts as trx_ts,
>>> APP_1.ts as app_1_ts,
>>> APP_2.ts as app_2_ts,
>>> APP_2.balance as app_2_balance
>>> ONE ROW PER MATCH
>>> PATTERN (TRX ANY_EVENT*? APP_1 NOT_LOAN*? APP_2) WITHIN INTERVAL 
>>> '10' DAY
>>> DEFINE
>>> TRX AS trace(TRX.event_type = 'trx' AND TRX.v > 1000,
>>>   'TRX', TRX.uuid, TRX.cif, TRX.event_type, TRX.ts),
>>> ANY_EVENT AS trace(true,
>>>   'ANY_EVENT', TRX.uuid, ANY_EVENT.cif, 
>>> ANY_EVENT.event_type, ANY_EVENT.ts),
>>> APP_1 AS trace(APP_1.event_type = 'application' AND APP_1.ts < 
>>> TRX.ts + INTERVAL '3' DAY,
>>>   'APP_1', TRX.uuid, APP_1.cif, APP_1.event_type, APP_1.ts),
>>> APP_2 AS trace(APP_2.event_type = 'application' AND APP_2.ts > 
>>> APP_1.ts
>>>AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY AND 
>>> APP_2.balance < 100,
>>>   'APP_2', TRX.uuid, APP_2.cif, APP_2.event_type, APP_2.ts),
>>> NOT_LOAN AS trace(NOT_LOAN.event_type <> 'loan',
>>>   'NOT_LOAN', TRX.uuid, NOT_LOAN.cif, NOT_LOAN.event_type, 
>>> NOT_LOAN.ts)
>>> ))
>>>
>>>
>>> This scenario could be matched by sample events because:
>>> - TRX is matched by event with ts 2021-05-01 04:42:57
>>> - APP_1 by ts 2021-05-01 10:29:10
>>> - APP_2 by ts 2021-05-01 10:39:02
>>> Unfortunately I'm not getting any data. And it's not watermarks fault.
>>>
>>> Trace function has following code and gives me some logs:
>>>
>>> public class TraceUDF extends ScalarFunction {
>>>
>>> public Boolean eval(Boolean condition, @DataTypeHint(inputGroup = 
>>> InputGroup.ANY) Object ... message) {
>>> log.info((condition ? "Condition true: " : "Condition false: ") + 
>>> Arrays.stream(message).map(Object::toString).collect(Collectors.joining(" 
>>> ")));
>>> return condition;
>>> }
>>> }
>>>
>>> And log from this trace function is following.
>>>
>>> 2021-07-06 13:09:43,762 INFO TraceUDF [] - 
>>> Condition true: TRX 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 trx 
>>> 2021-05-01T04:42:57
>>> 2021-07-06 13:12:28,914 INFO  TraceUDF [] - 
>>> Condition true: ANY_EVENT 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 
>>> trx 2021-05-01T15:28:34
>>> 2021-07-06 13:12:28,915 INFO  TraceUDF [] - 
>>> Condition false: APP_1 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 trx 
>>> 2021-05-01T15:28:34
>>> 2021-07-06 13:12:28,915 INFO  TraceUDF [] - 
>>> Condition false: TRX 433ac9bc-d395-457n-986c-19e30e375f2e 0004091386 trx 
>>> 2021-05-01T15:28:34
>>>
>>> As you can see 2 events are missing.
>>> What can I do ?
>>> I failed with create minimal example of this bug. Any other ideas ?



-- 
Maciek Bryński


Bug in MATCH_RECOGNIZE ?

2021-07-06 Thread Maciej Bryński
Hi,
I have a very strange bug when using MATCH_RECOGNIZE.

I'm using some joins and unions to create an event stream. Sample
event stream (for one user) looks like this:

uuid cif event_type v balance ts
621456e9-389b-409b-aaca-bca99eeb43b3 0004091386 trx
4294.38 74.524950 2021-05-01 04:42:57
7b2bc022-b069-41ca-8bbf-e93e3f0e85a7 0004091386 application 0E-18
74.524950 2021-05-01 10:29:10
942cd3ce-fb3d-43d3-a69a-aaeeec5ee90e 0004091386 application 0E-18
74.524950 2021-05-01 10:39:02
433ac9bc-d395-457n-986c-19e30e375f2e 0004091386 trx
4294.38 74.524950 2021-05-01 04:42:57

Then I'm using following MATCH_RECOGNIZE definition (trace function
will be explained later)

CREATE VIEW scenario_1 AS (
SELECT * FROM events
MATCH_RECOGNIZE(
PARTITION BY cif
ORDER BY ts
MEASURES
TRX.v as trx_amount,
TRX.ts as trx_ts,
APP_1.ts as app_1_ts,
APP_2.ts as app_2_ts,
APP_2.balance as app_2_balance
ONE ROW PER MATCH
PATTERN (TRX ANY_EVENT*? APP_1 NOT_LOAN*? APP_2) WITHIN
INTERVAL '10' DAY
DEFINE
TRX AS trace(TRX.event_type = 'trx' AND TRX.v > 1000,
  'TRX', TRX.uuid, TRX.cif, TRX.event_type, TRX.ts),
ANY_EVENT AS trace(true,
  'ANY_EVENT', TRX.uuid, ANY_EVENT.cif,
ANY_EVENT.event_type, ANY_EVENT.ts),
APP_1 AS trace(APP_1.event_type = 'application' AND APP_1.ts <
TRX.ts + INTERVAL '3' DAY,
  'APP_1', TRX.uuid, APP_1.cif, APP_1.event_type, APP_1.ts),
APP_2 AS trace(APP_2.event_type = 'application' AND APP_2.ts > APP_1.ts
   AND APP_2.ts < APP_1.ts + INTERVAL '7' DAY AND
APP_2.balance < 100,
  'APP_2', TRX.uuid, APP_2.cif, APP_2.event_type, APP_2.ts),
NOT_LOAN AS trace(NOT_LOAN.event_type <> 'loan',
  'NOT_LOAN', TRX.uuid, NOT_LOAN.cif,
NOT_LOAN.event_type, NOT_LOAN.ts)
))


This scenario could be matched by sample events because:
- TRX is matched by event with ts 2021-05-01 04:42:57
- APP_1 by ts 2021-05-01 10:29:10
- APP_2 by ts 2021-05-01 10:39:02
Unfortunately I'm not getting any data. And it's not watermarks fault.

Trace function has following code and gives me some logs:

public class TraceUDF extends ScalarFunction {

public Boolean eval(Boolean condition, @DataTypeHint(inputGroup =
InputGroup.ANY) Object ... message) {
log.info((condition ? "Condition true: " : "Condition false:
") + Arrays.stream(message).map(Object::toString).collect(Collectors.joining("
")));
return condition;
}
}

And log from this trace function is following.

2021-07-06 13:09:43,762 INFO TraceUDF [] -
Condition true: TRX 621456e9-389b-409b-aaca-bca99eeb43b3 0004091386
trx 2021-05-01T04:42:57
2021-07-06 13:12:28,914 INFO  TraceUDF []
- Condition true: ANY_EVENT 621456e9-389b-409b-aaca-bca99eeb43b3
0004091386 trx 2021-05-01T15:28:34
2021-07-06 13:12:28,915 INFO  TraceUDF []
- Condition false: APP_1 621456e9-389b-409b-aaca-bca99eeb43b3
0004091386 trx 2021-05-01T15:28:34
2021-07-06 13:12:28,915 INFO  TraceUDF []
- Condition false: TRX 433ac9bc-d395-457n-986c-19e30e375f2e 0004091386
trx 2021-05-01T15:28:34

As you can see 2 events are missing.
What can I do ?
I failed with create minimal example of this bug. Any other ideas ?

Regards,
-- 
Maciek Bryński


Re: Jupyter PyFlink Web UI

2021-06-08 Thread Maciej Bryński
Nope.
I found the following solution.

conf = Configuration()
env = 
StreamExecutionEnvironment(get_gateway().jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf._j_configuration))
env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=env_settings)

I also created the bug report https://issues.apache.org/jira/browse/FLINK-22924.
I think this API should be exposed in Python.

śr., 9 cze 2021 o 04:57 Dian Fu  napisał(a):
>
> Hi Macike,
>
> You could try if the following works:
>
> ```
> table_env.get_config().get_configuration().set_string("rest.bind-port", "xxx")
> ```
>
> Regards,
> Dian
>
> > 2021年6月8日 下午8:26,maverick  写道:
> >
> > Hi,
> > I've got a question. I'm running PyFlink code from Jupyter Notebook starting
> > TableEnvironment with following code:
> >
> > env_settings =
> > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> > table_env = TableEnvironment.create(env_settings)
> >
> > How can I enable Web UI in this code?
> >
> > Regards,
> > Maciek
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Maciek Bryński


Re: Watermarks in Event Time Temporal Join

2021-04-28 Thread Maciej Bryński
Hi Leonard,
Let's assume we have two streams.
S1 - id, value1, ts1 with watermark = ts1 - 1
S2 - id, value2, ts2 with watermark = ts2 - 1

Then we have following interval join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 ON S1.id = S2.id and
ts1 between ts2 - 1 and ts2

Let's have events.
stream, id, value, ts
S1, id1, v1, 1
S2, id1, v2, 1

For this events and internal join Flink will emit an event in the output stream:
id1, v1, v2, 1
Despite the fact the watermark for both streams is not reached.

Now similar situation for Event Time Temporal Join
SELECT id, value1, value2, ts1 FROM S1 JOIN S2 FOR SYSTEM_TIME AS OF
S1.ts1 ON S1.id = S2.id

Let's have events.
S1, id1, v1, 1
S2, id1, v2, 1

Nothing is happening as none of the streams have reached the watermark.
Now let's add
S2, id2, v2, 101
This should trigger join for id1 because we have all the knowledge to
perform this join (we know that the watermark for id1 record was
reached).
Unfortunately to trigger join on id1 we also need a watermark on S1
side and I think this behaviour is wrong.

I hope I explained everything correctly.

Regards,
Maciek

wt., 27 kwi 2021 o 08:58 Leonard Xu  napisał(a):
>
> Hello, Maciej
>
> I agree the watermark should pass on versioned table side, because
> this is the only way to know which version of record should be used.
> But if we mimics behaviour of interval join then main stream watermark
> could be skipped.
>
>
> IIRC, rowtime interval join requires the watermark on both sides, and the 
> watermark
> will be used to clean up the outdated data and advance the data progress both 
> in rowtime  interval join and rowtime temporal join.
>
> Best,
> Leonard
>


-- 
Maciek Bryński


Re: Watermarks in Event Time Temporal Join

2021-04-26 Thread Maciej Bryński
Hi Shengkai,
Thanks for the answer. The question is do we need to determine if an
event in the main stream is late.
Let's look at interval join - event is emitted as soon as there is a
match between left and right stream.
I agree the watermark should pass on versioned table side, because
this is the only way to know which version of record should be used.
But if we mimics behaviour of interval join then main stream watermark
could be skipped.

Regards,
Maciek

pon., 26 kwi 2021 o 06:14 Shengkai Fang  napisał(a):
>
> Hi, maverick.
>
> The watermark is used to determine the message is late or early. If we only 
> use the watermark on versioned table side, we have no means to determine 
> whether the event in the main stream is ready to emit.
>
> Best,
> Shengkai
>
> maverick  于2021年4月26日周一 上午2:31写道:
>>
>> Hi,
>> I'm curious why Event Time Temporal Join needs watermarks from both sides to
>> perform join.
>>
>> Shouldn't watermark on versioned table side be enough to perform join ?
>>
>>
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



-- 
Maciek Bryński


Re: Join a datastream with tables stored in Hive

2020-12-01 Thread Maciej Bryński
Hi,
There is an implementation only for temporal tables which needs some
Java/Scala coding (no SQL-only implementation).
On the same page there is annotation:
Attention Flink does not support event time temporal table joins currently.

So this is the reason, I'm asking this question.
My use case:
I want to join the Kafka stream with a table from JDBC source.
Every record in Kafka has event time. Also records in JDBC are versioned.
I didn't find a SQL solution to this problem.

Regards,
Maciek

wt., 1 gru 2020 o 20:31 Chesnay Schepler  napisał(a):

> According to the documentation
> 
> this is already implemented.
>
> On 12/1/2020 3:53 PM, maverick wrote:
>
> Hi Kurt,
> Is there any Jira task for tracking progress of adding event time support to
> temporal joins ?
>
> Regards,
> Maciek
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>

-- 
Maciek Bryński