Re: Regarding Flink Upgrades

2022-11-02 Thread Dawid Wysakowicz

Hi,

What you linked to is what the community agreed to support. So far we've 
been able to support three versions at all times (e.g. currently we 
merge bugfixes to 1.17.x, 1.16.x, 1.15.x), which is one extra version 
than what is described in the docs. I don't think this will ever decrease.


As for older versions it is rather unlikely they will receive updates. 
We might discuss it for critical bugs such as e.g. security bugs, but 
only if it is easy to fix and it is always on case by case basis. Still 
1.12.x is really far behind and personally I don't think it will receive 
any updates going forward.


Best,

Dawid

On 02/11/2022 07:45, Prasanna kumar wrote:


Hi Community,

Currently we are using version 1.12.7 and it is running without any 
issue. And we see that version 1.17 is set to release early next year.


That means we would be 5 versions behind.

1) So how far can we lag behind the current flink version ?

2) If we face any issues like log4j that we faced last year , would it 
be fixed for older versions if so till what version ?


This is what I see on the update policy on old releases.  
(https://flink.apache.org/downloads.html)


image.png
 Would this hold good going forward also ?

Thanks,
Prasanna.






OpenPGP_0x31D2DD10BFC15A2D.asc
Description: OpenPGP public key


OpenPGP_signature
Description: OpenPGP digital signature


Re: Avro deserialization issue

2022-04-14 Thread Dawid Wysakowicz

Hi Anitha,

As far as I can tell the problem is with avro itself. We upgraded avro 
version we use underneath in Flink 1.12.0. In 1.11.x we used avro 1.8.2, 
while starting from 1.12.x we use avro 1.10.0. Maybe that's the problem. 
You could try to upgrading the avro version in your program. Just add 
dependency on avro 1.10. If I remember correctly that should simply work.


If that does not solve the problem, I'd look into which field fails to 
be deserialized.


Best,

Dawid

On 13/04/2022 18:11, Anitha Thankappan wrote:

Hi Piotr,

*The code i wrtten in 1.13.1
*

public final class BigQuerySourceFunction extends
RichSourceFunction implements ResultTypeQueryable {

@Override
public void open(Configuration parameters) throws Exception {

deserializer.open(RuntimeContextInitializationContextAdapters.deserializationAdapter(getRuntimeContext()));
}
..
}

*this is working fine and i got the required output
*

++-++
| op |          id |                           name |
++-++
| +I |           1 |                            ABC |
| +I |           2 |                            XYZ |
++-++
2 rows in set


*same code ith 1.11.0,
*

   RuntimeContextInitializationContextAdapters cannot be resolved

*rewritten the code as
*

@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("bigquery"));
}

*but the result was:
*

Caused by: java.io.IOException: Failed to deserialize Avro record.
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:150)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
at

com.flink.connector.BigQuerySourceFunction.run(BigQuerySourceFunction.java:106)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at

org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at

org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index 3 out
of bounds for length 2
at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at

org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
at

org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at

org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at

org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at

org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at

org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:147)


Thanks and Regards,
Anitha Thankappan


On Wed, Apr 13, 2022 at 9:16 PM Piotr Nowojski  
wrote:


Hey,

Could you be more specific about how it is not working? A compiler
error that there is no such class as
RuntimeContextInitializationContextAdapters? This class has been
introduced in Flink 1.12 in FLINK-18363 [1]. I don't know this
code and I also don't know where it's documented, but:
a) maybe you should just mimic in reverse the changes done in the
pull request from this issue [2]? `deserializer.open(() ->
getRuntimeContext().getMetricGroup().addGroup("something"))`?
b) RuntimeContextInitializationContextAdapters is `@Internal`
class that is not part of a Public API, so even in 1.13.x you
should be using it. You should probably just implement your
own DeserializationSchema.InitializationContext.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18363
[2] https://github.com/apache/flink/pull/13844/files

pon., 11 kwi 2022 o 15:42 Anitha Thankappan
 napisał(a):


Hi,

I developed a flink connector to read data from bigquery . The
Bigquery read rows are in AVRO format.
I tried it with 1.13.1 its working fine. But my requirement is
1.11.0, in that case the code:

Re: Interval join operator is not forwarding watermarks correctly

2022-03-17 Thread Dawid Wysakowicz

Hi Alexis,

I tried looking into your example. First of all, so far, I've spent only 
a limited time looking at the WatermarkGenerator, and I have not 
thoroughly understood how it works. I'd discourage assigning watermarks 
anywhere in the middle of your pipeline. This is considered to be an 
anti pattern and at some point there were thoughts to remove such a 
possibility.



Having said that, indeed there is a bug in the 
TimestampsAndWatermarksOperator[1]. The issue is that 
WatermarkGenerators in the middle of a pipeline cut off upstream 
watermarks, however they do not cut off WatermarkStatuses. Therefore 
when you have a chain of generators G1, G2, G3 each of parallelism 2. It 
is also true that only a single operator receives records (because of 
the constant key) and therefore only a single subtask is intended to 
generate watermarks. Therefore what happens is that the generator G3 
starts with one subtask IDLE and one ACTIVE, but it receives ACTIVE 
status from the upstream generator G2, because of all the cross 
connections (keyBy) between operators. Therefore it marks both channels 
ACTIVE, but only a single one generates watermarks.



As a recommendation I'd suggest keeping the watermark generation just 
right after the source. If this is not possible, as a workaround before 
it is fixed in Flink, you need to cut off WatermarkStatuses somehow. You 
can do that either in a custom operator or by modifying the 
TimestampsAndWatermarksOperator.



Best,

Dawid


[1] https://issues.apache.org/jira/browse/FLINK-26708



On 15/03/2022 23:47, Alexis Sarda-Espinosa wrote:

For completeness, this still happens with Flink 1.14.4

Regards,
Alexis.


*From:* Alexis Sarda-Espinosa 
*Sent:* Friday, March 11, 2022 12:21 AM
*To:* user@flink.apache.org 
*Cc:* pnowoj...@apache.org 
*Subject:* Re: Interval join operator is not forwarding watermarks 
correctly
I think I managed to create a reproducible example [1], I think it's 
due to the use of window + join + window. When I run the test, I never 
see the print output, but if I uncomment part of the code in the 
watermark generator to mark it as idle more quickly, it starts working 
after a while.


[1] https://github.com/asardaes/flink-interval-join-test


Regards,
Alexis.


*From:* Alexis Sarda-Espinosa 
*Sent:* Thursday, March 10, 2022 7:47 PM
*To:* user@flink.apache.org 
*Cc:* pnowoj...@apache.org 
*Subject:* RE: Interval join operator is not forwarding watermarks 
correctly


I found [1] and [2], which are closed, but could be related?

[1] https://issues.apache.org/jira/browse/FLINK-23698

[2] https://issues.apache.org/jira/browse/FLINK-18934

Regards,

Alexis.

*From:*Alexis Sarda-Espinosa 
*Sent:* Donnerstag, 10. März 2022 19:27
*To:* user@flink.apache.org
*Subject:* Interval join operator is not forwarding watermarks correctly

Hello,

I’m in the process of updating from Flink 1.11.3 to 1.14.3, and it 
seems the interval join in my pipeline is no longer working. More 
specifically, I have a sliding window after the interval join, and the 
window isn’t firing. After many tests, I ended up creating a custom 
operator that extends IntervalJoinOperator and I overrode 
processWatermark1() and processWatermark2() to add logs and check when 
they are called. I can see that processWatermark1() isn’t called.


For completeness, this is how I use my custom operator:

joinOperator = new CustomIntervalJoinOperator(…);

stream1.connect(stream2)

.keyBy(selector1, selector2)

.transform("Interval Join", TypeInformation.of(Pojo.class), joinOperator);

---

Some more information in case it’s relevant:

- stream2 is obtained from a side output.

- both stream1 and stream2 have watermarks assigned by custom 
strategies. I also log watermark creation, and I can see that 
watermarks are indeed emitted as expected in both streams.


Strangely, my watermark strategies mark themselves idle if they don’t 
receive new events after 10 minutes, and if I send some events and 
wait 10 minutes, processWatermark1() is called! On the other hand, if 
I continuously send events, it is never called.


Is this a known issue?

Regards,

Alexis.



OpenPGP_signature
Description: OpenPGP digital signature


Re: Move savepoint to another s3 bucket

2022-03-08 Thread Dawid Wysakowicz

Hi Lukas,

I am afraid you're hitting this bug: 
https://issues.apache.org/jira/browse/FLINK-25952


Best,

Dawid

On 08/03/2022 16:37, Lukáš Drbal wrote:

Hello everyone,

I'm trying to move savepoint to another s3 account but restore always 
failed with some weird 404 error.


We are using lyft k8s operator [1] and flink 1.13.6 (in stacktrace you 
can see version 1.13.6-396a8d44-szn which is just internal build from 
flink commit b2ca390d478aa855eb0f2028d0ed965803a98af1)


What I'm trying to do:

 1. create savepoint for pipeline via ./bin/flink savepoint 
 2. copy data under path configured in state.savepoints.dir from
source s3 to new s3
 3. change all configuration and restore pipeline

Is this steps correct or I'm doing something wrong or unsupported?

All options related to s3 have valid values for new s3 account but 
restore failed with exception bellow. Error message includes original 
path (s3://flink/savepoints/activity-searched-query) which doesn't 
exists on new account so that 404 is expected. But I still don't 
understand why flink tries that path because related config options 
contains new bucket info.
    high-availability.storageDir: 
's3:///ha/pipelines-runner-activity-searched-query'


jobmanager.archive.fs.dir: 's3:///history' 


state.checkpoints.dir:
's3:///checkpoints/activity-searched-query' 


state.savepoints.dir:
's3:///savepoints/activity-searched-query' 


+ valid values for s3.access-key and s3.secret-key

I found original path in _metadata file in savepoint data but changing 
that (search) leads to some weird OOM, I hope this should not 
be needed and that values should be ignored.


state.backend is hashmap if it is important.

Restore back from source butcket works as expected.

Thanks a lot!

Regards,
L.

Stacktrace:

2022-03-08 15:39:25,838 [flink-akka.actor.default-dispatcher-4]
INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph -
CombineToSearchedQuery -> (LateElementsCounter,
TransformToStreamElement -> Sink: SearchedQueryKafkaSink) (1/2)
(0c0f108c393b9a5b58f861c1032671d0) switched from INITIALIZING to
FAILED on 10.67.158.155:45521-d8d19d @ 10.67.158.155 (dataPort=36341).
org.apache.flink.util.SerializedThrowable: Exception while
creating StreamOperatorStateContext.
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at java.lang.Thread.run(Thread.java:832) ~[?:?]
Caused by: org.apache.flink.util.SerializedThrowable: Could not
restore keyed state backend for
WindowOperator_bd2a73c53230733509ca171c6476fcc5_(1/2) from any of
the 1 provided restore options.
at

org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
at

org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.11-1.13.6-396a8d44-szn.jar:1.13.6-396a8d44-szn]
... 10 more
Caused by: org.apache.flink.util.SerializedThr

Re: Could not stop job with a savepoint

2022-03-07 Thread Dawid Wysakowicz

Hi,

From the exception it seems the job has been already done when you're 
triggering the savepoint.


Best,

Dawid

On 07/03/2022 14:56, Vinicius Peracini wrote:

Hello everyone,

I have a Flink job (version 1.14.0 running on EMR) and I'm having this 
issue while trying to stop a job with a savepoint on S3:


org.apache.flink.util.FlinkException: Could not stop with a savepoint 
job "df3a3c590fabac737a17f1160c21094c".
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)

at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.ExecutionException: 
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint 
Coordinator is suspending.
at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at 
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:579)

... 9 more

I'm using incremental and unaligned checkpoints (aligned checkpoint 
timeout is 30 seconds). I also tried to create the savepoint without 
stopping the job (using flink savepoint command) and got the same 
error. Any idea what is happening here?


Thanks in advance,

Aviso Legal: Este documento pode conter informações confidenciais e/ou 
privilegiadas. Se você não for o destinatário ou a pessoa autorizada a 
receber este documento, não deve usar, copiar ou divulgar as 
informações nele contidas ou tomar qualquer ação baseada nessas 
informações.


Disclaimer: The information contained in this document may be 
privileged and confidential and protected from disclosure. If the 
reader of this document is not the intended recipient, or an employee 
agent responsible for delivering this document to the intended 
recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited.


OpenPGP_signature
Description: OpenPGP digital signature


Re: Question about Flink counters

2022-03-07 Thread Dawid Wysakowicz

Hi Shane,

I don't think counters, or should I say metrics, are the right 
abstraction for the use case you described. Metrics are a way to get an 
insight into the running job and what is its current state. It is not a 
good mean to calculate results. Metrics are not stateful, they are not 
preserved across restarts. Counters are generally scoped. Therefore 
counters in UDFs are scoped[1] to the parallel instance that uses it. 
You should combine them on the monitoring system side if you need a more 
general overview.



Hope that helps,

Best,

Dawid


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#scope


On 06/03/2022 15:13, Shane Bishop wrote:

Hi Zhanghao Chen,

Sure, I can give some context.

My team's Flink application runs as a Kinesis Data Analytics streaming 
application [1] in AWS.


Our application receives events from Amazon Simple Queue Service (SQS) 
[2] in our source, and then uses a property of the SQS event to 
download from Amazon S3 [3]. The external metrics system for our 
counters is Amazon CloudWatch metrics [4].


For both the SQS consumer source and our S3 downloader operator, we 
have a counter for number of received items, number of successfully 
processed items, and number of items that failed to process.


However, during testing we have found that the count for SQS events 
received and S3 downloads is much too high. The counts for our 
counters in CloudWatch is much higher than the number of records 
reported in the Flink dashboard.


The goal is that our metrics in CloudWatch should accurately reflect 
the number of SQS events received and successfully or unsuccessfully 
processed, and the number of S3 downloads that were attempted and 
succeeded or failed.


I am looking for help understanding why our counter values are inaccurate.

[1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html
[2] 
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html

[3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html
[4] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html



*From:* Zhanghao Chen 
*Sent:* March 5, 2022 11:11 PM
*To:* Shane Bishop ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
Hi Shane,

Could you share more information on what you would like to use the 
counter for?


The counter discussed here is primarily designed for exposing counts 
to external metric systems. Usually, each task would count on its 
own, and it is left for the external metric system (usu. a time series 
database) to do aggregations. Also, you cannot reference a counter 
from a different machine. I'm not sure if this is what you expected.


Best,
Zhanghao Chen

*From:* Shane Bishop 
*Sent:* Saturday, March 5, 2022 23:22
*To:* Zhanghao Chen ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
If I used a thread-safe counter implementation, would that be enough 
to make the count correct for a Flink cluster with multiple machines?


Best,
Shane

*From:* Zhanghao Chen 
*Sent:* March 4, 2022 11:08 PM
*To:* Shane Bishop ; user@flink.apache.org 


*Subject:* Re: Question about Flink counters
Hi Shane,

Flink provides a generic counter interface with a few implementations. 
The default implementation SimpleCounter, which is not 
thread-safe,//is used when you calling counter(String name) on a 
MetricGroup. Therefore, you'll need to use your own thread-safe 
implementation, check out the second example of Metrics | Apache Flink 
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter> for 
reference.


Best,
Zhanghao Chen

*From:* Shane Bishop 
*Sent:* Saturday, March 5, 2022 5:24
*To:* user@flink.apache.org 
*Subject:* Question about Flink counters
Hi all,

For Flink counters [1], are increment operations guaranteed to be 
atomic across all parallel tasks? I.e., is there a guarantee that the 
counter values will not be higher than expected?


Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


OpenPGP_signature
Description: OpenPGP digital signature


Re: streaming mode with both finite and infinite input sources

2022-02-25 Thread Dawid Wysakowicz
This should be supported in 1.14 if you enable checkpointing with 
finished tasks[1], which has been added in 1.14. In 1.15 it will be 
enabled by default.


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/#enabling-and-configuring-checkpointing


On 25/02/2022 08:18, Jin Yi wrote:
so we have a streaming job where the main work to be done is 
processing infinite kafka sources.  recently, i added a fromCollection 
(finite) source to simply write some state once upon startup.  this 
all seems to work fine.  the finite source operators all finish, while 
all the infinite source operators continue running with watermarks.


however, the weird thing is that savepointing throws exceptions, and 
there have been no automatic checkpoints at all while the job has been 
running for 90+minutes (checkpoint config is unaligned, exactly once 
every 5m w/ a 1h timeout).


is this mixed finity not a supported setup?


OpenPGP_signature
Description: OpenPGP digital signature


Re: CDC using Query

2022-02-04 Thread Dawid Wysakowicz

Hi Mohan,

I don't know much about Kafka Connect, so I will not talk about its 
features and differences to Flink. Flink on its own does not have a 
capability to read a CDC stream directly from a DB. However there is the 
flink-cdc-connectors[1] projects which embeds the standalone Debezium 
engine inside of Flink's source and can process DB changelog with all 
processing guarantees that Flink provides.


As for the idea of processing further with Kafka Streams. Why not 
process data with Flink? What do you miss in Flink?


Best,

Dawid

[1] https://github.com/ververica/flink-cdc-connectors

On 04/02/2022 13:55, mohan radhakrishnan wrote:

Hi,
     When I was looking for CDC I realized Flink uses Kafka Connector 
to stream to Flink. The idea is to send it forward to Kafka and 
consume it using Kafka Streams.


Are there source DLQs or additional mechanisms to detect failures to 
read from the DB ?


We don't want to use Debezium and our CDC is based on queries.

What mechanisms does Flink have that a Kafka Connect worker does not ? 
Kafka Connect workers can go down and source data can be lost.


Does the idea  to send it forward to Kafka and consume it using Kafka 
Streams make sense ? The checkpointing feature of Flink can help ? I 
plan to use Kafka Streams for 'Exactly-once Delivery' and changelog 
topics.


Could you point out relevant material to read ?

Thanks,
Mohan


OpenPGP_signature
Description: OpenPGP digital signature


Re: Reading from Kafka kafkarecorddeserializationschema

2022-02-04 Thread Dawid Wysakowicz

Hi,

You can use DeserializationSchema with KafkaSource as well. You can pass 
it to the KafkaSource.builder():


    KafkaSource.<...>builder()

    .setDeserializer(...)

You can also take a look at the StateMachineExample[1], where 
KafkaSource is used.


BTW, have you tried looking at Table API? It would abstract quite a few 
things for you, e.g. translation of what I presume is a CSV format[2] in 
your case.


Best,

Dawid

[1] 
https://github.com/apache/flink/blob/5846d8d61b4b2aa10f925e9f63885cb7f6686303/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java#L104


[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/csv/


On 03/02/2022 16:56, HG wrote:

Hello

Most examples available still use the FlinkKafkaConsumer unfortunately.
I need to consume events from Kafka.
The format is Long,Timestamp,String,String.

Do I need to create a custom deserializer?

What also confuses me is

KafkaSource** source = KafkaSource

How does it relate to the deserializer?
Is there a kind of  type or is  fine even if the message 
is a composite of Long,String...?


Regards Hans



OpenPGP_signature
Description: OpenPGP digital signature


Re: Queryable State Deprecation

2022-02-04 Thread Dawid Wysakowicz

Hi Karthik,

The reason we deprecated it is because we lacked committers who could 
spend time on getting the Queryable state to a production ready state. I 
might be speaking for myself here, but I think the main use case for the 
queryable state is to have an insight into the current state of the 
application for debugging purposes. If it is used for data serving 
purposes, we believe it's better to sink the data into an external 
store, which can provide better discoverability and more user friendly 
APIs for querying the results.


As for debugging/tracking insights you may try to achieve similar 
results with metrics.


Best,

Dawid

On 01/02/2022 16:36, Jatti, Karthik wrote:


Hi,

I see on the Flink Roadmap that Queryable state API is scheduled to be 
deprecated but I couldn’t find much information on confluence or this 
mailing group’s archives to understand the background as to why it’s 
being deprecated and what would be a an alternative.  Any pointers to 
help me get some more information here would be great.


Thanks,

Karthik




The information in the email message containing a link to this page, 
including any attachments thereto (collectively, “the e-mail”), is 
only for use by the intended recipient(s). The e-mail may contain 
information that is confidential, proprietary and/or privileged. If 
you have reason to believe that you are not the intended recipient, 
please notify the sender that you may have received this e-mail in 
error and delete all copies of it, including attachments, from your 
computer. Any viewing, copying, disclosure or distribution of this 
information by an unintended recipient is prohibited and by an 
intended recipient may be governed by arrangements in place between 
the sender’s and recipient’s respective firms. Eze Software does not 
represent that the e-mail is virus-free, complete or accurate. Eze 
Software accepts no liability for any damage sustained in connection 
with the content or transmission of the e-mail.


Copyright © 2013 Eze Castle Software LLC. All Rights Reserved.


OpenPGP_signature
Description: OpenPGP digital signature


Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Dawid Wysakowicz
I looked into the code again and unfortunately I have bad news :( Indeed 
we treat S3 as if it always injects entropy. Even if the entropy key is 
not specified, which effectively means it is disabled. I created a JIRA 
ticket[1] to fix it.


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-25952

On 03/02/2022 17:02, Frank Dekervel wrote:

Hello,

I didn't know about entropy injection. I have checked, and there is no 
entropy injection configured in my flink-conf.yaml. This is the 
relevant section:


s3.access-key: ???
s3.endpoint: http://minio/
s3.path.style.access: true
s3.secret-key: ???

I see that there are still S3 paths defined in the _metadata

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ cat _metadata 
 | strings | grep s3
Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/a6d59334-2769-4a6e-b582-d38d58352021 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f627c959-d69d-41a1-9732-748795efb9ad 

Xs3://flink/savepoints/savepoint-299415-266c01ff6b2a/f9a03af4-2868-4797-a950-10257282ed1e 


...

not all paths are existing

kervel@atlas:~/Downloads/savepoint-299415-266c01ff6b2a$ l
b81e4e28-eabd-499e-9561-b98137084a9c  _metadata

Thanks!

Greetings,
Frank

On 03.02.22 16:38, Dawid Wysakowicz wrote:


Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are 
not relocatable in combination with entropy injection as described 
here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on 
a minio server), i downloaded it to my laptop for inspection. I have 
two KeyedProcessFunctions (state in the same savepoint) and 
strangely enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); 
// here we ser/deser in kryo not scala case class serializer. No 
idea why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the 
savepoint on the original path on S3 (which my laptop doesn't have 
access to). I tought savepoints were supposed to be relocatable. 
Weirdly enough, the first one works just fine.


This is the exception i get:

[error] Caused by: 
com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
(Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 
Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.fac

Re: flink savepoints not (completely) relocatable ?

2022-02-03 Thread Dawid Wysakowicz

Hi Frank.

Do you use entropy injection by chance? I am afraid savepoints are not 
relocatable in combination with entropy injection as described here[1].


Best,

Dawid

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints


On 03/02/2022 14:44, Frank Dekervel wrote:

Hello,

I'm trying to inspect a savepoint that was stored on s3://flink/ (on a 
minio server), i downloaded it to my laptop for inspection. I have two 
KeyedProcessFunctions (state in the same savepoint) and strangely 
enough, one works perfectly and the other one doesn't.


The code is fairly simple:

val savepoint= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;

import org.apache.flink.api.scala._

// first one

val ti= createTypeInformation[AlarmMessageKey]
val tia= createTypeInformation[AlmState]
val ds= savepoint.readKeyedState("alm-1", newAlmStateReader(), ti, tia)
val valss= ds.collect().asScala;

// now the second one:
val savepoint2= Savepoint.load(bEnv.getJavaEnv, path, 
newHashMapStateBackend()) ;
val ds_sup= savepoint.readKeyedState("ags-1", newSupStateReader()); // 
here we ser/deser in kryo not scala case class serializer. No idea 
why, but that's how its in the savepoint

val vals_sup= ds_sup.collect().asScala;

The second one seems to fail because it wants to access the savepoint 
on the original path on S3 (which my laptop doesn't have access to). I 
tought savepoints were supposed to be relocatable. Weirdly enough, the 
first one works just fine.


This is the exception i get:

[error] Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: 
Access Denied (Service: Amazon S3; Status Code: 403; Error Code: 
AccessDenied; Request ID: 79QK1G93VPVPED3H; S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=; 
Proxy: null), S3 Extended Request ID: 
U00lp/pBQHDZzgySwX8w9CtHel9uQTNqxEIDjSdQVrDdNk/TExmQo1SmZ9rNw2D5XiyZ6wDqn5g=
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
[error] at 
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
[error] at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
[error] at 
com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:1512)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$openStream$2(PrestoS3FileSystem.java:1096)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1093)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.openStream(PrestoS3FileSystem.java:1078)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.seekStream(PrestoS3FileSystem.java:1071)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.lambda$read$1(PrestoS3FileSystem.java:1015)
[error] at 
com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:139)
[error] at 
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3InputStream.read(PrestoS3FileSystem.java:1014)
[error] at 
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
[error] at 
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
[error] at 
java.io.FilterInputStream.read(FilterInputStream.java:83)
[error] at 
org.apache.flink.fs.s3presto.common.HadoopDataInputStream.read(HadoopDataInputStream.java:86)
[error] at 
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:50)
[error] at 
java.io.DataInputStream.readInt(DataInputStr

Re: create savepoint on bounded source in streaming mode

2022-01-25 Thread Dawid Wysakowicz

Hi Shawn,

You could also take a look at the hybrid source[1]

Best,

Dawid

[1]https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/

On 26/01/2022 08:39, Guowei Ma wrote:

Hi Shawn
Currently Flink can not trigger the sp at the end of the input. An 
alternative way might be that you need to develop a customized source, 
which triggers a savepoint when it notices that all the input split 
has been handled.

Or you could see the state process api[1], which might be helpful.

Thanks for your sharing but I have another little question:
I think you need to process all the historical events to rebuild the 
correct state. So there might be no gain even if you periodically 
create a savepoint. So why did you need to "rebuild" the state 
periodically? Do I miss something?


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/


Best,
Guowei


On Wed, Jan 26, 2022 at 2:17 PM Shawn Du  wrote:

     our application is stateful. processing live events depends
on the state. but for kinds of reason, we need rebuild the state.
it will be very costly to replay all data.
     our historical events data are stored in s3. so we want to
create states/savepoints periodically so that we can rebuild the
state from a point. we call this as a bootstrap process.
     any ideas?

     Thanks.

--
Sender:Guowei Ma 
Sent At:2022 Jan. 26 (Wed.) 14:04
Recipient:Shawn Du 
Cc:user 
Subject:Re: create savepoint on bounded source in streaming mode

Hi, Shawn
I think Flink does not support this mechanism yet.
Would you like to share the scenario in which you need this
savepoint at the end of the bounded input?
Best,
Guowei


On Wed, Jan 26, 2022 at 1:50 PM Shawn Du
 wrote:
Hi experts,

assume I have several files and I want replay these files in
order in streaming mode and create a savepoint when files play
at the end. it is possible?
I wrote a simple test app, and job are finished when source is
at the end. I have no chance to creat a savepoint. please help.

Thanks
Shawn




OpenPGP_signature
Description: OpenPGP digital signature


Re: Examples / Documentation for Flink ML 2

2022-01-17 Thread Dawid Wysakowicz
I am adding a couple of people who worked on it. Hopefully, they will be
able to answer you.

On 17/01/2022 13:39, Bonino Dario wrote:
>
> Dear List,
>
> We are in the process of evaluating Flink ML version 2.0 in the
> context of some ML task mainly concerned with classification and
> clustering.
>
> While algorithms for this 2 domains are already present, although in a
> limited form (perhaps) in the latest release of Flink ML, we did not
> found any example / documentation that could guide our experiments.
>
> Is some adoption example available, like code, tutorial or any
> information that might help us in bootstrapping a Flink ML 2 project?
>
> Thank you very much
>
> Best regards
>
> -- 
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com 
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
>  


OpenPGP_signature
Description: OpenPGP digital signature


Re: [statefun] upgrade path - shared cluster use

2022-01-17 Thread Dawid Wysakowicz
I am pretty confident the goal is to be able to run on the newest Flink
version. However, as the release cycle is decoupled for both modules it
might take a bit.

I added Igal to the conversation, who I hope will be able to give you an
idea when you can expect that to happen.

Best,

Dawid

On 17/01/2022 11:48, Filip Karnicki wrote:
> Hi, we're currently using statefun 3.1.1 on a shared cloudera cluster,
> which is going to be updated to 1.14.x 
>
> We think this update might break our jobs, since 3.1.1 is not
> explicitly compatible with 1.14.x
> (https://flink.apache.org/downloads.html#apache-flink-stateful-functions-311
> <https://flink.apache.org/downloads.html#apache-flink-stateful-functions-311>)
>
> Is there any appetite for statefun to always be made compatible with
> the latest base flink version, or do we need to stop using the shared
> cluster and procure our own? Or is the update of statefun to something
> like 3.2.0 (based on 1.14.x) just a matter of having the resources to
> do it?
>
> Thanks
> Fil



OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink per-job cluster HbaseSinkFunction fails before starting - Configuration issue

2022-01-17 Thread Dawid Wysakowicz
Hey Kamil,

Have you followed this guide to setup kerberos authentication[1]?

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/security/security-kerberos/

On 14/01/2022 17:09, Kamil ty wrote:
> Hello all,
> I have a flink job that is using the HbaseSinkFunction as specified
> here: flink/flink-connectors/flink-connector-hbase-2.2 at master ·
> a0x8o/flink (github.com)
> <https://github.com/a0x8o/flink/tree/master/flink-connectors/flink-connector-hbase-2.2#writing-into-hbase-tables-from-datastreams>
>
> I'm deploying the job to a cluster in yarn per-job mode. Using flink
> run -d job.jar.
>
> The job gets accepted and I get the address of the UI but when looking
> at the UI the job stays at CREATED and never actually runs. After some
> time it stops. 
>
> This error stands out when looking at the logs:
> WARN [main] org.apache.hadoop.security.LdapGroupsMapping: Exception
> while trying to get password for alias
> hadoop.security.group.mapping.ldap.bind.password:
> java.io.IOException: Configuration problem with provider path.
>         at
> org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2428)
>         at
> org.apache.hadoop.conf.Configuration.getPassword(Configuration.java:2347)
>         at
> org.apache.hadoop.security.LdapGroupsMapping.getPassword(LdapGroupsMapping.java:797)
>         at
> org.apache.hadoop.security.LdapGroupsMapping.setConf(LdapGroupsMapping.java:680)
>         at
> org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:77)
>         at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:137)
>         at org.apache.hadoop.security.Groups.(Groups.java:105)
>         at org.apache.hadoop.security.Groups.(Groups.java:101)
>         at
> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:476)
>         at
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:352)
>         at
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:314)
>         at
> org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1996)
>         at
> org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:743)
>         at
> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:693)
>         at
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:604)
>         at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer.main(ContainerLocalizer.java:468)
> Caused by: java.nio.file.AccessDeniedException:
> /var/run/.../process/1546359139-yarn-NODEMANAGER/creds.localjceks
>         at
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
>         at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>         at
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>         at
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>         at java.nio.file.Files.newByteChannel(Files.java:361)
>         at java.nio.file.Files.newByteChannel(Files.java:407)
>         at
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>         at java.nio.file.Files.newInputStream(Files.java:152)
>         at
> org.apache.hadoop.security.alias.LocalKeyStoreProvider.getInputStreamForFile(LocalKeyStoreProvider.java:76)
>         at
> org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.locateKeystore(AbstractJavaKeyStoreProvider.java:325)
>         at
> org.apache.hadoop.security.alias.AbstractJavaKeyStoreProvider.(AbstractJavaKeyStoreProvider.java:86)
>         at
> org.apache.hadoop.security.alias.LocalKeyStoreProvider.(LocalKeyStoreProvider.java:56)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:42)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider.(LocalJavaKeyStoreProvider.java:34)
>         at
> org.apache.hadoop.security.alias.LocalJavaKeyStoreProvider$Factory.createProvider(LocalJavaKeyStoreProvider.java:68)
>         at
> org.apache.hadoop.security.alias.CredentialProviderFactory.getProviders(CredentialProviderFactory.java:73)
>         at
> org.apache.hadoop.conf.Configuration.getPasswordFromCredentialProviders(Configuration.java:2409)
>         ... 15 more
>
> This seems as if it tries to access by password based authentication
> but on the cluster only Kerberos based authentication should be used.
>
> The log output wh

Re: Sorting/grouping keys and State management in BATCH mode

2022-01-12 Thread Dawid Wysakowicz
Hey Krzysztof,

Re 1. I believe you are asking where the state is kept. It is stored in
memory, but bear in mind there is only ever state kept for the current
key. Once all records for a key are processed the corresponding state is
discarded as it won't be needed anymore.

Re 2. The sorting algorithm keeps records in serialized form in the
managed memory of an operator[2]. It potentially spills the intermediate
results to local disks once it reaches the sort spilling threshold[1].

Re 2,5 We know there is no more records for a given key once we receive
a record with a key different from the previous one after sorting.
(Sorting is applied on keys, it's more of a grouping, than really
sorting). This is leveraged e.g. in the
BatchExecutionKeyedStateBackend#setCurrentKey.

ExternalSorter is not a public class and thus there is no usage
examples, nor user facing documentation. Unfortunately best you can get
is the javadocs/comments in the class itself.

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#taskmanager-runtime-sort-spilling-threshold

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory


On 11/01/2022 17:07, Chesnay Schepler wrote:
> Looping in Dawid who can hopefully answer your questions.
>
> On 11/01/2022 13:00, Krzysztof Chmielewski wrote:
>> Hi,
>> Im reading docs and FLIP-140 available for BATCH mode [1][2] where it
>> reads that 
>> " In |BATCH| mode, the configured state backend is ignored. Instead,
>> the input of a keyed operation is grouped by key (using sorting) and
>> then we process all records of a key in turn."  [1]
>>
>> I would like to ask:
>> 1. Where (Heap, OffHeap) Flink keeps records for BATCH Streams if the
>> configured  state backed  is ignored. In FLIP-140 i see there was a
>> new State implementation created, that is prepared to keep only one
>> key value, but there is no information "where" regarding memory it is
>> kept. 
>>
>> 2. Where Sorting algorithm keeps it intermediate results?
>> How/Who knows that there will be no more records for given key?
>>
>> If I get it right, sorting is done through ExternalSorter class. Is
>> there any documentation or usage example for ExternalSorter and
>> description about SortStege like READ, SORT, SPILL?
>>
>> Regards,
>> Krzysztof Chmielewski
>>
>>
>> [1] 
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
>> [2] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys
>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: GenericWriteAheadSink, declined checkpoint for a finished source

2021-12-06 Thread Dawid Wysakowicz
Hi,

Sorry to hear it's hard to find the option. It is part of the 1.14
release[1]. It is also documented how to enable it[2]. Happy to hear how
we can improve the situation here.


As for the exception. Are you seeing this exception occur repeatedly for
the same task? I can imagine a situation that with frequent checkpoints
a Task might finish while there is an RPC triggering request pending
somewhere on the wire. In that case such a checkpoint could fail, but
the next triggered should not try to trigger the FINISHED task anymore.


Does it cause problems in your pipeline or are you just concerned with
the entry in logs?

Best,

Dawid


[1]
https://github.com/apache/flink/blob/ef0e17ad6319175ce0054fc3c4db14b78e690dd6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java#L236

[2]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/checkpointing/#checkpointing-with-parts-of-the-graph-finished-beta


On 06/12/2021 18:24, James Sandys-Lumsdaine wrote:
> Hello again,
>
> We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it
> would solve our issue with checkpointing with finished data sources.
> We need the checkpointing to work to trigger Flink's
> GenericWriteAheadSink class.
>
> Firstly, the constant mentioned on FLIP-147 that enables the feature
> isn't available as far as we can see
> (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's not in ConfigConstants
> or CheckpointConfig for example. So instead we enabled with the following:
>
> conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
> true);
> StreamExecutionEnvironment env
> = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(config)
> env.enableCheckpointing(30 * 1000);
> ...
>
> We can see the constant available in 1.15 on Google but not the
> version we were expecting (1.14.0).
>
> Previously we had to have long Thread.sleep(x) in to keep the sources
> alive when checkpoints were taken. When we enable this feature using
> the explicit string and removed these hacks we start seeing these errors:
>
> INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph
> Source: Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785)
> switched from RUNNING to FINISHED.
>
>  
>
> [some lines removed for brevity]
>
> * *
>
> INFO 
> [flink-akka.actor.default-dispatcher-7] *o.a.f.r.c.CheckpointCoordinator
> Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d
> failed due to {}*
>
> org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager
> received a checkpoint request for unknown
> task e015c4f0910fb27e15fec063616ab785. Failure reason: Task local
> checkpoint failure.
>
>  at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
> ~[flink-runtime-1.14.0.jar:1.14.0]
>
>  at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
> ~[na:na]
>
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[na:1.8.0_91]
>
>  at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> ~[na:na]
>
>  at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> ~[na:na]
>
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> ~[na:na]
>
>  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> ~[na:na]
>
>  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> ~[scala-library-2.11.12.jar:na]
>
>  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> ~[na:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>  at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> ~[scala-library-2.11.12.jar:na]
>
>  at akka.actor.Actor.aroundRec

Re: Upgrade from 1.13.1 to 1.13.2/1.13.3 failing

2021-11-09 Thread Dawid Wysakowicz
Hey Sweta,

Sorry I did not get back to you earlier.

Could you explain how do you do the upgrade? Do you try to upgrade your
cluster through HA services (e.g. zookeeper)? Meaning you bring down the
1.13.1 cluster down and start a 1.13.2/3 cluster which you intend to
pick up the job automatically along with the latest checkpoint? Am I
guessing correct? As far as I can tell we do not support such a way of
upgrading.

The way we support upgrades is via a savepoint/checkpoint. I'd suggest
to either take a savepoint on 1.13.1 and restore[1] the job on 1.13.2
cluster or use an externalized checkpoint created from 1.13.1.

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#starting-a-job-from-a-savepoint

On 22/10/2021 16:39, Chesnay Schepler wrote:
> The only suggestion I can offer is to take a savepoint with 1.13.1 and
> try to restore from that.
>
> We will investigate the problem in
> https://issues.apache.org/jira/browse/FLINK-24621; currently we don't
> know why you are experiencing this issue.
>
> On 22/10/2021 16:02, Sweta Kalakuntla wrote:
>> Hi,
>>
>> We are seeing error while upgrading minor versions from 1.13.1 to
>> 1.13.2. JobManager is unable to recover the checkpoint state. What
>> would be the solution to this issue?
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not retrieve
>> checkpoint 2844 from state handle under
>> checkpointID-0002844. This indicates that the retrieved
>> state handle is broken. Try cleaning the state handle store.
>> at
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:190)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:122)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:317)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at
>> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>> at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>> Source) ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>> ~[?:?]
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>> ~[?:?]
>> at java.lang.Thread.run(Unknown Source) ~[?:?]
>&g

Re: Duplicate Calls to Cep Filter

2021-11-01 Thread Dawid Wysakowicz
Hey Puneet,

The reason for multiple calls to the condition is as mentioned before,
because once it is evaluated for the TAKE and the second time for the
IGNORE edge. The reason is that every edge is evaluated independently.
There is no joint context or caching of conditions. I agree from a
perspective of such a simple pattern it would make sense to evaluate it
just once for the TAKE. However, construction of the NFA is in general a
complicated (maybe even overcomplicated) process it becomes not that
obvious in more complex cases.

Having said every edge is evaluated independently, we need to evaluate
the condition for the IGNORE edge, because we should IGNORE the element
(or in other words wait for a next element) only if we have not taken
it. Therefore the condition for the IGNORE edge is NOT(TAKE_CONDITION).

Best,

Dawid

On 29/10/2021 01:05, Puneet Duggal wrote:
> Hi Yun Gao,
>
> Thank you for the immediate response. You are correct that any state
> in nfa will have 2 options.. either to TAKE(element) and undergo
> transition to another state Or it can IGNORE and remain in its
> current state. But if a state decides to ignore the element , then why
> will it evaluate itself with the given element. What i mean is that
> regarding followedBy , only 2 transitions are possible, TAKE and
> IGNORE. Why would pattern (state of NFA) evaluate an element if it is
> undergoing IGNORE transition. Evaluation should only happen for TAKE
> transition right?
>
> Thanks,
> Puneet
>
> On Wed, Oct 27, 2021 at 7:26 PM Yun Gao  <mailto:yungao...@aliyun.com>> wrote:
>
> Hi Puneet,
>
> Sorry I'm not be an expert for CEP, but the underlying
> implementation of the CEP should
> be based on the NFA, and from the API documentation, `followedBy`
> does not require the
> two patterns are adjacent (namely the give pattern also accepts
> ['a', 'c', 'b']. Thus when
> recieved 'a', I think the NFA might have two possible transitions,
> one is to accept 'b'
> and get a match, another one is to ignore 'b' and waiting for the
> following events, thus
> the condition might be eval for two times.
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Schwalbe Matthias  <mailto:matthias.schwa...@viseca.ch>>
> *Send Date:*Wed Oct 27 17:55:18 2021
> *Recipients:*Puneet Duggal  <mailto:puneetduggal1...@gmail.com>>, user
> mailto:user@flink.apache.org>>
> *Subject:*RE: Duplicate Calls to Cep Filter
>
> Hi Puneet,
>
>  
>
> …  not able to answer your question, but I would be
> curious to also print out the value with your diagnostic
> message.
>
> … assuming we’ll see an ‘a’ and a ‘b’ for both filters resp.
>
>  
>
> … simple explanation would be that the filters are applied
> to all input, regardless of the pattern matching that
> follow the input filtering (just guessing)
>
>  
>
> Thias
>
>  
>
>  
>
> *From:*Puneet Duggal  <mailto:puneetduggal1...@gmail.com>>
> *Sent:* Mittwoch, 27. Oktober 2021 11:12
> *To:* user  <mailto:user@flink.apache.org>>
> *Subject:* Duplicate Calls to Cep Filter
>
>  
>
> Hi,
>
>  
>
> I am facing an issue where a single event is causing
> execution of a cep filter multiple times. I went through
> this video
> <https://www.youtube.com/watch?v=XRyl0RGWs1M> explaining
> automata formation from pattern sequence but it still does
> not explain the behaviour that I am facing. Following is
> the sample pattern for whichI amtesting this behaviour.
>
> Pattern innerPattern =
>
>     Pattern
>
>     .begin("start")
>
>     .where(new
> SimpleCondition() {
>
>     @Override
>
>     public boolean
> filter(String value) throws Exception {
>
>     System.out.println("In
> the beginning");
>
>     return value.equals("a");
>
>     }
>
>    

Re: [DISCUSS] Creating an external connector repository

2021-10-19 Thread Dawid Wysakowicz
Hey all,

I don't have much to add to the general discussion. Just a single
comment on:

that we could adjust the bylaws for the connectors such that we need
fewer PMCs to approve a release. Would it be enough to have one PMC
vote per connector release?

I think it's not an option. This particular rule is one of few rules
from the bylaws that actually originates from ASF rather than was
established within the Flink community. I believe we do need 3 PMC votes
for any formal ASF releases [1].

Votes on whether a package is ready to release use majority
approval-- i.e. at least three PMC members must vote affirmatively
for release, and there must be more positive than negative votes.
Releases may not be vetoed*.*Generally the community will cancel the
release vote if anyone identifies serious problems, but in most
cases the ultimate decision lies with the individual serving as
release manager. The specifics of the process may vary from project
to project,*but the 'minimum quorum of three +1 votes' rule is
universal.*

Best,

Dawid

https://www.apache.org/foundation/voting.html#ReleaseVotes

On 19/10/2021 14:21, Arvid Heise wrote:
> Okay I think it is clear that the majority would like to keep connectors
> under the Apache Flink umbrella. That means we will not be able to have
> per-connector repositories and project management, automatic dependency
> bumping with Dependabot, or semi-automatic releases.
>
> So then I'm assuming the directory structure that @Chesnay Schepler
>  proposed would be the most beneficial:
> - A root project with some convenience setup.
> - Unrelated subprojects with individual versioning and releases.
> - Branches for minor Flink releases. That is needed anyhow to use new
> features independent of API stability.
> - Each connector maintains its own documentation that is accessible through
> the main documentation.
>
> Any thoughts on alternatives? Do you see risks?
>
> @Stephan Ewen  mentioned offline that we could adjust the
> bylaws for the connectors such that we need fewer PMCs to approve a
> release. Would it be enough to have one PMC vote per connector release? Do
> you know of other ways to tweak the release process to have fewer manual
> work?
>
> On Mon, Oct 18, 2021 at 10:22 PM Thomas Weise  wrote:
>
>> Thanks for initiating this discussion.
>>
>> There are definitely a few things that are not optimal with our
>> current management of connectors. I would not necessarily characterize
>> it as a "mess" though. As the points raised so far show, it isn't easy
>> to find a solution that balances competing requirements and leads to a
>> net improvement.
>>
>> It would be great if we can find a setup that allows for connectors to
>> be released independently of core Flink and that each connector can be
>> released separately. Flink already has separate releases
>> (flink-shaded), so that by itself isn't a new thing. Per-connector
>> releases would need to allow for more frequent releases (without the
>> baggage that a full Flink release comes with).
>>
>> Separate releases would only make sense if the core Flink surface is
>> fairly stable though. As evident from Iceberg (and also Beam), that's
>> not the case currently. We should probably focus on addressing the
>> stability first, before splitting code. A success criteria could be
>> that we are able to build Iceberg and Beam against multiple Flink
>> versions w/o the need to change code. The goal would be that no
>> connector breaks when we make changes to Flink core. Until that's the
>> case, code separation creates a setup where 1+1 or N+1 repositories
>> need to move lock step.
>>
>> Regarding some connectors being more important for Flink than others:
>> That's a fact. Flink w/o Kafka connector (and few others) isn't
>> viable. Testability of Flink was already brought up, can we really
>> certify a Flink core release without Kafka connector? Maybe those
>> connectors that are used in Flink e2e tests to validate functionality
>> of core Flink should not be broken out?
>>
>> Finally, I think that the connectors that move into separate repos
>> should remain part of the Apache Flink project. Larger organizations
>> tend to approve the use of and contribution to open source at the
>> project level. Sometimes it is everything ASF. More often it is
>> "Apache Foo". It would be fatal to end up with a patchwork of projects
>> with potentially different licenses and governance to arrive at a
>> working Flink setup. This may mean we prioritize usability over
>> developer convenience, if that's in the best interest of Flink as a
>> whole.
>>
&g

Re: Issue with Flink UI for Flink 1.14.0

2021-10-14 Thread Dawid Wysakowicz
I am afraid it is a bug in flink 1.14. I created a ticket for it
FLINK-24550[1]. I believe we should pick it up soonish. Thanks for
reporting the issue!

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24550

On 13/10/2021 20:32, Peter Westermann wrote:
>
> Hello,
>
>  
>
> I just started testing Flink 1.14.0 and noticed some weird behavior.
> This is for a Flink cluster with zookeeper for HA and two job managers
> (one leader, one backup). The UI on the leader works fine. The UI on
> the other job manager does not load any job-specific data. Same
> applies to the REST interface. If I requests job data from
> /v1/jobs/{jobId}, I get the expected response on the leader but on the
> other job manager, I only get an exception stack trace:
>
>  
>
> {"errors":["Internal server error."," side:\njava.util.concurrent.CancellationException\n\tat
> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)\n\tat
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)\n\tat
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)\n\tat
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)\n\tat
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)\n\tat
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)\n\tat
> java.util.Optional.ifPresent(Optional.java:159)\n\tat
> org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)\n\tat
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)\n\tat
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:238)\n\tat
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat
&g

Re: a question about flink table catalog.

2021-10-14 Thread Dawid Wysakowicz
I don't think so. As the name tells it is stored "in-memory" which
intrinsically means transient. If you need a persistent catalog you can
use e.g. Hive or Postgres.

You can also try to implement a catalog backed by e.g. a property file.
This could potentially end up in Flink / flink-packages, but I am not
aware of any plans. (cc Timo)

Best,

Dawid

On 14/10/2021 15:05, Yuepeng Pan wrote:
> Dawid Wysakowicz
>
>    Thanks for your reply.  Will community to plan to implement this
> feature ? 
>
>
>
> Best, 
> Roc
>
>
>
> At 2021-10-14 21:00:37, "Dawid Wysakowicz"  wrote:
>
> If I understand your question correctly, you're asking if you can
> somehow persist the GenericInMemoryCatalog. I am afraid it is not
> possible. The idea of the GenericInMemoryCatalog is that it is
> transient and is stored purely in memory.
>
> Best,
>
> Dawid
>
> On 14/10/2021 13:44, Yuepeng Pan wrote:
>> Hi, 
>>
>>    Community.
>>    If I want to save the catalog state after the operation based
>> on GenericInMemoryCatalog, which is convenient to recover the
>> last catalog instance when opening the session or
>> tableEnvironment next time. Does flink  support this feature?   
>> Thank you.
>>
>> Best,
>> Roc
>>
>>
>>  
>>
>
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: a question about flink table catalog.

2021-10-14 Thread Dawid Wysakowicz
If I understand your question correctly, you're asking if you can
somehow persist the GenericInMemoryCatalog. I am afraid it is not
possible. The idea of the GenericInMemoryCatalog is that it is transient
and is stored purely in memory.

Best,

Dawid

On 14/10/2021 13:44, Yuepeng Pan wrote:
> Hi, 
>
>    Community.
>    If I want to save the catalog state after the operation based on
> GenericInMemoryCatalog, which is convenient to recover the last
> catalog instance when opening the session or tableEnvironment next
> time. Does flink  support this feature?    Thank you.
>
> Best,
> Roc
>
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Exception: SequenceNumber is treated as a generic type

2021-10-14 Thread Dawid Wysakowicz
Hey Ori,

As for the SequenceNumber issue, I'd say yes, it can be seen as a bug.
In the current state one can not use kinesis consumer with the
pipeline.generic-types=false. The problem is because we use the
TypeInformation.of(SequenceNumber.class) method, which will in this case
always fallback to GenericTypeInfo (The GenericTypeInfo is the one that
uses KryoSerializer. That is way it does not help to register a Kryo
serializer, it is still a generic type).

A dirty hack for you to try, could be to copy over the SequenceNumber
over to your job and annotate it with TypeInfo where you provide a
factory that would create something other than GenericTypeInfo (you
could even use a copy of GenericTypeInfo, but with a removed check for
the pipeline.generic-types flag). I know it is a really dirty hack.

Ad. 2 Unfortunately I can't think of a better way.

I have created FLINK-24549 to track the kinesis issue.[1]

On the backpressure note, are you sure the issue is in the
serialization? Have you tried identifying the slow task first?[2]

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24549

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/monitoring/back_pressure.html

On 14/10/2021 12:41, Ori Popowski wrote:
> I'd appreciate if someone could advice on this issue.
>
> Thanks
>
> On Tue, Oct 12, 2021 at 4:25 PM Ori Popowski  <mailto:ori@gmail.com>> wrote:
>
> Hi,
>
> I have a large backpressure in a somewhat simple Flink application
> in Scala. Using Flink version 1.12.1.
>
> To find the source of the problem, I want to eliminate all classes
> with generic serialization, so I set
> pipeline.generic-types=false
>
> in order to spot those classes and write a serializer for them.
>
> However, for some reason, I get the stracktrace attached below.
>
>  1. It looks suspicious that one of Flink's own classes doesn't
> have a serializer and should fallback to generic
> serialization. Is this a bug?
>  2. I want to get a list of all classes which fallback to generic
> serialization. How can I do it other than setting
> pipeline.generic-types=false and eliminating those classes one
> by one?
>  3. I defined a custom Kryo serializer for this class using both
> addDefaultKryoSerializer(…) and
> registerTypeWithKryoSerializer(…) and I still get the same
> error message. How can I provide Flink with custom
> serialization so it stops complaining about this?
>
>
>
> java.lang.UnsupportedOperationException: Generic types have been
> disabled in the ExecutionConfig and type
> org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
> is treated as a generic type.
> at
> 
> org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
> at
> 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
> at
> 
> org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
> at
> 
> org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
> at
> 
> org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
> at
> 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
> at
> 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
> at
> 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
> at
> 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
> at
> 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
> at
> 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
> at
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
> at
> 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
> at
> 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecu

Re:

2021-10-14 Thread Dawid Wysakowicz
I hope Rui (in cc) will be able to help you.

Best,

Dawid

On 12/10/2021 15:32, Andrew Otto wrote:
> Hello,
>
> I'm trying to use HiveCatalog with Kerberos.  Our Hadoop cluster, our
> Hive Metastore, and our Hive Server are kerberized.  I can
> successfully submit Flink jobs to Yarn authenticated as my users using
> a cached ticket, as well as using a keytab.
>
> However, I can't seem to register a HiveCatalog with my
> TableEnvironment.  Here's my code:
> import org.apache.flink.table.api._
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
> val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
> val tableEnv = TableEnvironment.create(settings)
> val catalog = new HiveCatalog("analytics_hive", "flink_test", 
> "/etc/hive/conf")
> tableEnv.registerCatalog("analytics_hive", catalog)
>
> Which causes an exception:
> Caused by: java.lang.reflect.InvocationTargetException:
> org.apache.hadoop.hive.metastore.api.MetaException: Could not connect
> to meta store using any of the URIs provided. Most recent failure:
> org.apache.thrift.transport.TTransportException: GSS initiate failed
>
> (Full stacktrace here
> <https://gist.github.com/ottomata/79fbad1b97efebd9c71d1bf11d171ade>.)
>
> The same error happens if I try to submit this job using my cached
> kerberos ticket, or with a keytab.
> I have also tried wrapping the HiveCatalog in a Hadoop
> UserGroupInformation PrivilegedExceptionAction as described here
> <https://blog.csdn.net/weibokong789/article/details/106427481> and got
> the same result (no real idea what I'm doing here, just trying some
> things.)
>
> Is there something more I have to do to use HiveCatalog with a
> kerberized Hive Metastore?  Should Flink support this out of the box?  
>
> Thanks!
> - Andrew Otto
>   SRE, Wikimedia Foundation
>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Issues while upgrading from 1.12.1 to 1.14.0

2021-10-06 Thread Dawid Wysakowicz
Hi Parag,

When you restore from a savepoint do you see a line like: "Restoring job
{} from {}" in jobmanagers logs? Is the entire state lost or just part
of it? Could you explain a bit what does your job look like and how do
you check that the state is lost?

Sorry if too obvious, but what are the "accumulators" you refer to? Are
they *State primitives[1] or really constructs that are called
Accumulator[2]? The latter are not checkpointed.

Best,

Dawid

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#using-keyed-state

[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/user_defined_functions/#accumulators--counters

On 06/10/2021 08:42, Parag Somani wrote:
> Yes Nico. I have evaluated this.
>
> I have tried below:
>
>  1. Take the savepoint
>  2. Stop the job
>  3. Shutdown the instances
>  4. Started new pod using below command:
>
> /docker-entrypoint.sh "standalone-job" "-Ds3.access-key=${AWS_ACCESS_KEY_ID}" 
> "-Ds3.secret-key=${AWS_SECRET_ACCESS_KEY}"  
> "-Ds3.endpoint=${AWS_S3_ENDPOINT}" 
> "-Dhigh-availability.zookeeper.quorum=${ZOOKEEPER_CLUSTER}" "--job-classname" 
> "com.test.MySpringBootApp" "--fromSavepoint" 
> "s3://s3-health-service-discovery/savepoints" ${args}
>
> I haven't observed any errors during start-up in logs. But the state
> got reset i.e. values stored inside the accumulator got flushed.
>
> On Tue, Oct 5, 2021 at 9:40 PM Nicolaus Weidner
>  <mailto:nicolaus.weid...@ververica.com>> wrote:
>
> Hi Parag,
>
> I am not so familiar with the setup you are using, but did you
> check out [1]? Maybe the parameter 
> [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] 
> is what you are looking for?
>
> Best regards,
> Nico
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#application-mode-on-docker
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#application-mode-on-docker>
>
> On Tue, Oct 5, 2021 at 12:37 PM Parag Somani
> mailto:somanipa...@gmail.com>> wrote:
>
> Hello,
>
> We are currently using Apache flink 1.12.0 deployed on k8s
> cluster of 1.18 with zk for HA. Due to certain vulnerabilities
> in container related with few jar(like netty-*, meso), we are
> forced to upgrade.
>
> While upgrading flink to 1.14.0, faced NPE,
> 
> https://issues.apache.org/jira/browse/FLINK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17402570#comment-17402570
> 
> <https://issues.apache.org/jira/browse/FLINK-23901?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel=17402570#comment-17402570>
>
> To address it, I have followed steps
>
>  1. savepoint creation
>  2. Stop the job
>  3. Restore from save point where i am facing challenge.
>
> For step #3 from above, i was able to restore from savepoint
> mainly because:
> "|bin/flink run -s :savepointPath [:runArgs]| "
> It majorly about restarting a jar file uploaded. As our
> application is based on k8s and running using docker, i was
> not able to restore it. And because of it, state of variables
> in accumulator got corrupted and i lost the data in one of env.
>
> My query is, what is preffered way to restore from savepoint,
> if application is running on k8s using docker.
>
> We are using following command to run job manager:
>  /docker-entrypoint.sh "standalone-job" 
> "-Ds3.access-key=${AWS_ACCESS_KEY_ID}" 
> "-Ds3.secret-key=${AWS_SECRET_ACCESS_KEY}"  
> "-Ds3.endpoint=${AWS_S3_ENDPOINT}" 
> "-Dhigh-availability.zookeeper.quorum=${ZOOKEEPER_CLUSTER}" "--job-classname" 
> ""  ${args}
>
> Thank you in advance...!
>
> -- 
> Regards,
> Parag Surajmal Somani.
>
>
>
> -- 
> Regards,
> Parag Surajmal Somani.


OpenPGP_signature
Description: OpenPGP digital signature


Re: Error: Timeout of 60000ms expired before the position for partition

2021-10-04 Thread Dawid Wysakowicz
Hi,

Do you mean that you fail to start Kafka? Or do you get the exception
from Flink. Could you please share the full stack trace of the error?

Best,

Dawid

On 02/10/2021 16:58, Dipanjan Mazumder wrote:
> Hi,
>
>   I am getting below error while starting the flink as a standalone
> single jvm process through jar. The kafka is deployed as a seperate
> cluster and the process is not able to start and fails with below
> error after 60sec:
>
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> expired before the position for partition topic_input_si_risk_engine-0
> could be determined
>
> Not sure of the reason , but don't get this issue when kafka and
> zookeeper is on the same machine as the flink process that is
> consuming data from kafka.
>
> Any pointers would be helpful.
>
> Regards
> Dipanjan


OpenPGP_signature
Description: OpenPGP digital signature


Re: Exception thrown during batch job execution on YARN even though job succeeded

2021-10-04 Thread Dawid Wysakowicz
Hey Ken,

Regarding Rufus, I know he might be a bit eager in changing lines ;) If
you want to ignore his changes in git blame, please take a look here[1].

For the main issue, do you mind creating a ticket? I hope someone will
be able to pick it up.

Best,

Dawid


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/ide_setup/#ignoring-refactoring-commits

On 01/10/2021 02:10, Ken Krugler wrote:
> Hi all,
>
> We’ve upgraded from Flink 1.11 to 1.13, and our workflows are now
> sometimes failing with an exception, even though the job has succeeded.
>
> The stack trace for this bit of the exception is:
>
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could
> not complete the operation. Number of retries has been exhausted.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at
> org.apache.flink.client.program.ContextEnvironment.getJobExecutionResult(ContextEnvironment.java:117)
> at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
>         at my.program.execute.workflow...
>
> The root cause is "java.net.ConnectException: Connection refused”,
> returned from the YARN node where the Job Manager is (was) running.
>
> ContextEnvironment.java line 117 is:
>
> jobExecutionResult = jobExecutionResultFuture.get();
>
> This looks like a race condition, where YARN is terminating the Job
> Manager, and this sometimes completes before the main program has
> retrieved all of the job status information.
>
> I’m wondering if this is a side effect of recent changes to make
> execution async/non-blocking.
>
> Is this a known issue? Anything we can do to work around it?
>
> Thanks,
>
> — Ken
>
> PS - The last two people working on this area code were Aljoscha and
> Robert (really wish git blame didn’t show most lines as being modified
> by “Rufus Refactor”…sigh)
>
> --
> Ken Krugler
> http://www.scaleunlimited.com <http://www.scaleunlimited.com>
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink application mode with no ui , how to start job using k8s ?

2021-10-04 Thread Dawid Wysakowicz
Hi Dhiru,

For the question about auto scaling I'd recommend you this[1] blogpost
from my colleague. I believe he explains it quite well how to do it.

Besides that I am not sure what is your other question. Are you asking
how to start the jobmanager without the UI? Can't you just simply not
expose the port?

Best,

Dawid

[1] https://flink.apache.org/2021/05/06/reactive-mode.html

On 30/09/2021 02:41, Dhiru wrote:
> Hi ,
>
>    My requirement is to create Flink cluster application Mode on k8s
> and do not want to expose UI, my requirement is to start the
> long-running  job which can be instantiated at boot time of flink and
> keep running
>
> use these resource files from jobmanager-application-ha.yaml
> and taskmanager-job-deployment.yaml for creating cluster
> (https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions
> <https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions>)
>
>
> a)  I need to start job during run time , I can bundle my jar with
> Flink image, so that can  instantiate jar 
>
> b) Can I apply HPA (horizontal pod autoscalar) for task manager, will
> this work so that according to workload instance of  taskmanager goes
> up and down.
>
> --kumar
>
>
>


OpenPGP_signature
Description: OpenPGP digital signature


[ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Dawid Wysakowicz
The Apache Flink community is very happy to announce the release of
Apache Flink 1.14.0.
 
Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
 
The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349614
 
We would like to thank all contributors of the Apache Flink community
who made this release possible!
 
Regards,
Xintong, Joe, Dawid




OpenPGP_signature
Description: OpenPGP digital signature


Re: User defined function (UDF) not sent when submitting job to session cluster

2021-09-07 Thread Dawid Wysakowicz
Huh, of course. Actually I was too quick with my answer. Even if it is
serialized with the JobGraph, the class is necessary on TMs to be
deserialized. That's how java serialization works after all.

So the actual answer, it is serialized with the JobGraph. The class is
mandatory for deserialization.

Best,

Dawid

On 07/09/2021 16:31, Joel Edwards wrote:
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is
> calling a UDF which I register via the table API's
> createTemporarySystemFunction() method. The job runs locally, but when
> I attempt to submit it to a remote session cluster, the job fails with
> the error:
>
> `Cannot load user class: `
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I
> have looked over the UDF documentation, and I don't see a reason why
> it would not be serialized with the rest of the job. However, seeing
> as there is no error related to serializing the UDF, my assumptions
> related to UDF serialization must be incorrect. Is there a hint I can
> use to cause the closure cleaner to identify the UDF for
> serialization? I suspect the reason it is not being included is that
> it is referenced only in the SQL query, and not streams feeding the
> input table or the stream consuming the output table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along
> with the job?
>
> Thank you,
> Joel
>
>
> -- 
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions


OpenPGP_signature
Description: OpenPGP digital signature


Re: User defined function (UDF) not sent when submitting job to session cluster

2021-09-07 Thread Dawid Wysakowicz
Hi Joel,

There is a few uncertainties in my reply so I am including Timo who
should be able to confirm or deny what I wrote.

Generally speaking once a JobGraph is created it should contain the UDF.
I might be wrong here, but the UDF must be available while the JobGraph
is created. I believe you're submitting the job via the `flink run`
command. Is that right? Therefore the UDF must be available on the
classpath of this client, as that is the moment the JobGraph is create.
If you use the web ui to submit the job, then the JobGraph is created on
the JobManager and it must be available there.

Best,

Dawid

On 07/09/2021 16:31, Joel Edwards wrote:
> Good day,
>
> I have been attempting to submit a job to a session cluster. This job
> involves a pair of dynamic tables and a SQL query. The SQL query is
> calling a UDF which I register via the table API's
> createTemporarySystemFunction() method. The job runs locally, but when
> I attempt to submit it to a remote session cluster, the job fails with
> the error:
>
> `Cannot load user class: `
>
> If I place a fat jar containing all of my local dependencies on the
> JobManagers and TaskManagers, the UDF will be loaded. However, I would
> expect the UDF to be serialized and sent with the rest of the job. I
> have looked over the UDF documentation, and I don't see a reason why
> it would not be serialized with the rest of the job. However, seeing
> as there is no error related to serializing the UDF, my assumptions
> related to UDF serialization must be incorrect. Is there a hint I can
> use to cause the closure cleaner to identify the UDF for
> serialization? I suspect the reason it is not being included is that
> it is referenced only in the SQL query, and not streams feeding the
> input table or the stream consuming the output table.
>
> Summary of questions:
> - Will UDF be serialized with the job? Or are they never included?
> - Is it possible to hint at what should be serialized and sent along
> with the job?
>
> Thank you,
> Joel
>
>
> -- 
> Joel Edwards
> Software Architect
> Ed-Craft Software Solutions


OpenPGP_signature
Description: OpenPGP digital signature


Re: Broadcast data to all keyed streams

2021-09-07 Thread Dawid Wysakowicz
Hi James,

Can you elaborate why the "Broadcast State Pattern"[1] does not work for
you? I'd definitely recommend that approach.


I highly discourage this usage, but if you insist you could copy over
the ConnectedStreams#transform method and remove the check that guards
both sides of the operator are either keyed or non-keyed.


Best,

Dawid


[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

On 06/09/2021 18:02, James Sandys-Lumsdaine wrote:
> Hello,
>
> I have a Flink workflow which is partitioned on a key common to all
> the stream objects and a key that is best suited to the high volume of
> data I am processing. I now want to add in a new stream of prices that
> I want to make available to all partitioned streams - however, this
> new stream of prices does not have this common keyBy value.
>
> I have tried writing a piece of code using then broadcast() method (no
> args) to get this new price stream to be sent to all the parallel
> instances on an operator. The code looks like this:
>
> KeyedStream keyedRefDataStream = ;
>
> DataStream prices = ;
> DataStream broadcastPrices = prices.broadcast();
>
> keyedRefDataStream
>     .connect(broadcastPrices)
>     .process(new RefDataPriceJoiner()); // implements
> KeyedCoProcessFunction
>
> I then get an error saying the broadcastPrices stream must be keyed -
> but I can't key it on the same key as the refData stream because it
> lacks this field. 
>
> I could reshuffle all my data by re-keying the ref data on a different
> field but this will cause a huge amount of data to be sent over the
> network compared with me being able to broadcast this much smaller
> amount of data to my keyed streams. Note I am assuming this isn't a
> "broadcast state" example - I assume the broadcast() method allows me
> to send data to all partitions.
>
> Is any of this possible? Any pointers for me would be very helpful as
> I can't find answer on the web or in the documentation.
>
> Many thanks,
>
> James.


OpenPGP_signature
Description: OpenPGP digital signature


Re: De/Serialization API to tear-down user code

2021-09-02 Thread Dawid Wysakowicz
Hi Sergio,

You can find the explanation why we haven't added the close method in
the corresponding JIRA ticke[1]:

When adding close() method to both DeserializationSchema and
SerializationSchema with a default implementation, it breaks source
compatibility if a user's class implements both interfaces at the
same time. The problem is that java does not know which default
implementation to use from those two interfaces as the close()
signature is the same in both. In flink code base we have three of
such implementations: SimpleStringSchema
TypeInformationSerializationSchema and one in tests It is not a
problem for open as we have a paremeter there that differentiates
the two methods.

We decided to skip the closefor now until we have a first use case
for it. We do not need to close the schema registry client because
it communicates over REST. Moreover no other schema needs a close
for now. For the Table API we also need only the open for generating
the code of the serializer.

Now that you're reaching out with such a requirement we might revisit
it. WDYT Arvid?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-17306

On 02/09/2021 10:37, Sergio Morales wrote:
>
> Thank you for the answer. I’m using the (De)SerializationSchema in
> such way that it has a reference to a custom class that manages some
> resources. In the open() method I’m able to init the resources
> accordingly, but it is really strange that despite providing an
> “open()” there is no counter-part “close()” to release properly the
> same resources. We have in the project a memory leak that could be
> resolved using the close() API approach and now we have to do some
> extra hop by subclassing the SinkFunction to `@Override` the close
> method in order to close itself and the resources managed by the
> De/SerializationSchema too.
>
>  
>
> I was considering to add the close() API method by following a
> previous PR: https://github.com/apache/flink/pull/12006
> <https://github.com/apache/flink/pull/12006> , would it be something
> that the team is willing to accept or should I avoid any effort on
> that part because the previous design document is not valid anymore?
>
>  
>
> Regards,
>
> Sergio.
>
>  
>
> *From: *Caizhi Weng 
> *Date: *Thursday, 2 September 2021 at 04:18
> *To: *Sergio Morales 
> *Cc: *user 
> *Subject: *Re: De/Serialization API to tear-down user code
>
>  
>
> Hi!
>
>  
>
> The (De)serializationSchema is only a helper for changing the data
> object to another format. What's your use case? If you're creating a
> (De)serializationSchema for a source / sink you might want to open and
> close the resources in the open / close methods of the source / sink,
> not in the (De)serializationSchema.
>
>  
>
> Sergio Morales mailto:sdmorale...@gmail.com>>
> 于2021年9月1日周三下午6:44写道:
>
> Hi,
>
> I’m currently working to close some resources while using the
> SerializationSchema and DeserializationSchema (Flink-core
> v1.12.1), however, after revising the document outlining the API
> the methods
> 
> (https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988
> 
> <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148645988>)
> the close() operations are missing, and in the master branch I
> could not find any new version including them:
>
> *
> 
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
> 
> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java>
>
> *
> 
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
> 
> <https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java>
>
> Thank you for any help.
>
>  
>
> Regards,
>
> Sergio.
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Watermark UI after checkpoint failure

2021-07-19 Thread Dawid Wysakowicz
Do you mean a failed checkpoint, or do you mean that it happens after a
restore from a checkpoint? If it is the latter then this is kind of
expected, as watermarks are not checkpointed and they need to be
repopulated again.

Best,

Dawid

On 19/07/2021 07:41, Dan Hill wrote:
> After my dev flink job hits a checkpoint failure (e.g. timeout) and
> then has successful checkpoints, the flink job appears to be in a bad
> state.  E.g. some of the operators that previously had a watermark
> start showing "no watermark".  The jobs proceed very slowly.
>
> Is there documentation for this state?  It seems weird to me that
> operators would not show watermarks anymore.



OpenPGP_signature
Description: OpenPGP digital signature


Re: Apache Flink - Reading Avro messages from Kafka with schema in schema registry

2021-07-15 Thread Dawid Wysakowicz
Ad 1/2/4 Please refer to Avro's documentation on how do the reader and
writer schemas work e.g. here[1] and here[2]. That's purely Avro's thing.

Ad.3 Theoretically yes. The problem is the communication between
TaskManagers. We need to serialize the Avro records somehow. The chosen
approach is to use a single version of the schema in a pipeline and thus
do not query the schema registry from each node/operator. The channels
between TaskManagers are transient and thus we do not need to work with
different versions of the schema.

Another approach could be to store the schema along with records. We do
not want to do that for performance reasons. That's why we use the
provided schema distributed along with operators during scheduling for
both writing and reading.

Ad. 5 Yes, between TMs we use it as both reader and writer schema.
However in the source (usually Kafka), which is usually a permanent
storage you most probably want to store alongside records with different
schemas (because the schema may evolve over time) thus you need schema
registry.

Best,

Dawid

[1]
https://avro.apache.org/docs/1.10.2/spec.html#Data+Serialization+and+Deserialization

[2] https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution

On 15/07/2021 15:48, M Singh wrote:
> Hello Dawid:
>
> Thanks for your answers and references.
>
> I do have a few questions:
>
> 1. Is there any scenario where the reader and writer schema should
> differ ? 
>
> 2. How is the mismatch b/w the two schemas (one passed as argument and
> other retrieved from schema registry) resolved at run time ?
>
> 3. As mentioned - "If you provide just the writer schema, the reader
> schema is assumed to be the same." - Is it possible in Flink to just
> use schema registry to retrieve the schema which can be used for both
> reading/writing  ?
>
> 4. Regarding "You can say that we read multiple different versions
> from e.g. Kafka and normalize it to that provided schema that's
> required across the pipeline." - Does this mean that if the reader
> (passed as argument) and writer (retrieved from the registry) schemas
> differ then Flink will normalize the differences ? If so, are there
> any guidelines as to how the fields are normalized ?
>
> 5. Regarding: "he forGeneric is the readerSchema and at the same time
> the schema that Flink will be working with in the pipeline. It will be
> the schema used to serialize and deserialize records between different
> TaskManagers" - Is the reader schema (passed as argument) used for
> reading and writing b/w Taskmanagers, the what role does schema from
> the registry play ?  Does it have to do with the "normalization"
> you've mentioned ?
>
> Thanks again for your time.
>
> Mans
>
> On Tuesday, July 13, 2021, 10:22:32 AM EDT, Dawid Wysakowicz
>  wrote:
>
>
> Hi,
>
> Yes, you are right the schema in the forGeneric is the readerSchema
> and at the same time the schema that Flink will be working with in the
> pipeline. It will be the schema used to serialize and deserialize
> records between different TaskManagers. Between the Flink TaskManagers
> that schema plays the role of both the reader and the writer schema.
>
> The way that Avro works is that you must provide both the writer and
> the reader schema. Otherwise it simply does not work. If you provide
> just the writer schema, the reader schema is assumed to be the same.
> Without the writer schema it is not possible to deserialize Avro. See
> extract from Avro spec: [1]
>
> Binary encoded Avro data does not include type information or
> field names. The benefit is that the serialized data is small, but
> as a result a schema must always be used in order to read Avro
> data correctly. The best way to ensure that the schema is
> structurally identical to the one used to write the data is to use
> the exact same schema.
>
> Therefore, files or systems that store Avro data should always
> include the writer's schema for that data. Avro-based remote
> procedure call (RPC) systems must also guarantee that remote
> recipients of data have a copy of the schema used to write that
> data. In general, it is advisable that any reader of Avro data
> should use a schema that is the same (as defined more fully in
> Parsing Canonical Form for Schemas
> 
> <https://avro.apache.org/docs/1.10.2/spec.html#Parsing+Canonical+Form+for+Schemas>)
> as the schema that was used to write the data in order to
> deserialize it correctly. Deserializing data into a newer schema
> is accomplished by specifying an additional schema, the results of
> which are described in Schema Resolution
> <https://avro.apache.org/docs/1.10.2/spec.html#Schema+Resolution&g

Re: My batch source doesn't emit MAX_WATERMARK when it finishes - why?

2021-07-08 Thread Dawid Wysakowicz
Hi,

Your example does not show what watermarks are flowing through the
program. It prints the watermark at the point a record is being emitted.
As the cited text states, the final watermark is emitted after all
records are emitted. You can test it e.g. with the newly added
writeWatermark method in 1.14 or by implementing a ProcessFunction with
a timer for Long.MAX_VALUE, or lastly with a custom operator.

Best,

Dawid

On 08/07/2021 14:51, Yik San Chan wrote:
> Hi,
>
> According to the docs [1]
>
> When a source reaches the end of the input, it emits a final watermark
> with timestamp Long.MAX_VALUE, indicating the "end of time".
>
>
> However, in my small experiment [2], the Flink job reads from a local
> csv file, and prints a watermark for each record in the SinkFunction
> `invoke` method. Even though I expect the last record comes with a
> MAX_VALUE watermark, all records actually come with a MIN_VALUE watermark.
>
>
> ```
>
> watermark=-9223372036854775808
> watermark=-9223372036854775808
> 1
> 10
>
> ```
>
>
> I wonder what I miss? Is there a way to make sure the source generates
> a MAX_VALUE watermark after it finishes all records?
>
> Thank you!
>
> [1] 
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/Watermark.java#L39
> <https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/api/common/eventtime/Watermark.java#L39>
>
> [2] https://github.com/YikSanChan/flink-batch-source-watermark
> <https://github.com/YikSanChan/flink-batch-source-watermark>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink cep checkpoint size

2021-07-08 Thread Dawid Wysakowicz
Hi,

Sorry for the late reply.

Indeed I found a couple of problems with clearing the state for short
lived keys. I created a JIRA[1] issue to track it and opened a PR (which
needs test coverage before it can be merged) with fixes for those.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-23314

On 06/07/2021 09:11, Li Jim wrote:
> Hi, Mohit, 
>
> Have you figured out any solusions on this problem ?
>
> I am now facing the exactly same problem ,
>
> I was using Flink of version 1.12.0 and I also upgrated it to 1.13.1 but the 
> checkpoint size is still growing.
>
> On 2021/06/02 15:45:59, "Singh, Mohit"  wrote: 
>> Hi,
>>
>> I am facing an issue with cep operator where checkpoint size keeps 
>> increasing even though the pattern is fully matched. I have a stream with 
>> unique user id and I want to detect a pattern of product purchased by user.
>>
>> here is the sample stream data
>>
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product1","bids":3,"ts":"1622644781243"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":6,"ts":"1622644781245"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":4,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":2,"ts":"1622644781247"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product2","bids":1,"ts":"1622644781248"}
>> {"user_id":"45eff814-9016-4849-b607-391601e00e97 
>> ","product":"product3","bids":1,"ts":"1622644781248"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product1","bids":3,"ts":"1622644782235"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":6,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":4,"ts":"1622644782236"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":2,"ts":"1622644782237"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product2","bids":1,"ts":"1622644782238"}
>> {"user_id":"badc0716-9de8-4bef-80b9-8bfbd25b8e0b 
>> ","product":"product3","bids":1,"ts":"1622644782239"}
>> …..
>> …..
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(1);
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "cep");
>> DataStream stream = env.addSource(
>> new FlinkKafkaConsumer<>("orders", new SimpleStringSchema(), 
>> properties))
>> .map(json -> gson.fromJson(json, orders.class))
>> .assignTimestampsAndWatermarks(
>> 
>> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2))
>> .withTimestampAssigner((orders, timestamp) ->  
>> orders.ts)
>> );Pattern pattern = Pattern.begin(
>> "start",
>> AfterMatchSkipStrategy.skipPastLastEvent()).where(new 
>> SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product1");
>> }
>> }).times(1).followedBy("middle").where(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product2");
>> }
>> }).oneOrMore().until(new SimpleCondition() {
>> @Override
>> public boolean filter(orders value) throws Exception {
>> return value.product.equals("product3");
>> }
>> }).within(Time.seconds(10));
>> PatternStream patternStream =
>> CEP.pattern(stream.keyBy((KeySelector) orders -> 
>> orders.user_id), pattern);DataStream alerts = 
>> patternStream.select((PatternSelectFunction) matches ->
>> matches.get("start").get(0).user_id + "->" +
>> matches.get("middle").get(0).ts);
>> alerts.print();
>>
>>
>> [cid:image001.png@01D7579C.775FCA00]
>>
>> I have also attached the checkpoint file.
>>
>> It looks like the NFA state keeps track of all keys seen and the start state 
>> and that leads to increase in checkpoint size if the keys are not reused in 
>> patterns. So, if I have fixed number of keys the size do not increase. is 
>> this the expected behavior and correct understanding?
>> Is there a way to drop these keys once the pattern is matched.? or am I 
>> missing something here?
>>
>> Thanks,
>> Mohit
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink Metric Reporting from Job Manager

2021-07-08 Thread Dawid Wysakowicz
Hi,

I think that is not directly supported. After all, the main method can
also be executed outside of a JobManager and there you don't have any
Flink context/connections/components set up.

Best,

Dawid

On 08/07/2021 00:12, Mason Chen wrote:
> Hi all,
>
> Does Flink support reporting metrics from the main method that is ran on the 
> Job Manager? In this case, we want to report a failure to add an operator to 
> the Job Graph.
>
> Best,
> Mason



OpenPGP_signature
Description: OpenPGP digital signature


Re: Question about POJO rules - why fields should be public or have public setter/getter?

2021-07-08 Thread Dawid Wysakowicz
Hi Naehee,

Short answer would be for historic reasons and compatibility reasons. It
was implemented that way back in the days and we don't want to change
the default type extraction logic. Otherwise user jobs that rely on the
default type extraction logic for state storing would end up with a
state stored in an incompatible way with the updated serializer.

This is not a problem for Table/SQL programs as we control the state
internally, and that's why we were able to change the requirements for
POJOs in Table/SQL programs. [1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/#user-defined-data-types

On 08/07/2021 00:09, Naehee Kim wrote:
> According to the Flink doc, 
>
> Flink recognizes a data type as a POJO type (and allows “by-name”
> field referencing) if the following conditions are fulfilled:
>
>   * The class is public and standalone (no non-static inner class)
>   * The class has a public no-argument constructor
>   * All non-static, non-transient fields in the class (and all
> superclasses) are either public (and non-final) or have a public
> getter- and a setter- method that follows the Java beans naming
> conventions for getters and setters.
>
>
> PojoSerializer uses Java reflection to access an object's fields. One
> of PojoSerializer's constructor calls setAccessible(true) for all fields.
> for (int i = 0; i < numFields; i++) {
>    this.fields[i].setAccessible(true);
> }
> Then, to my knowledge, it can set a field regardless of the field's
> access control(private, public,..).
>
> However, its another constructor, called by PojoSerializerSnapshot,
> doesn't call setAccessible(true). Does anyone know the reason why
> setAccessible(true) is not called here? And why fields should be
> public or have a public gettter- and setter- method?
>
> Regards,
> Naehee
>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Write Kafka message header using FlinkKafkaProducer

2021-06-21 Thread Dawid Wysakowicz
Hi,

You can use KafkaSerializationSchema[1] which can create a
ProducerRecord with Headers.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html

On 21/06/2021 12:58, Tan, Min wrote:
>
> Hi,
>
>  
>
> I would like to add some meta data in the headers of kafka
> messages using FlinkKakfkaProducer.
>
> I googled a bit and have not found an example.
>
> Which Flink Serialization Schema should I use? Any suggestions?
>
>  
>
> Thank you very much for your help in advance.
>
>  
>
> Regards,
>
> Min
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: unsubscribe

2021-06-21 Thread Dawid Wysakowicz
You should send a message to user-unsubscr...@flink.apache.org if you
want to unsubscribe.

On 20/06/2021 00:08, SANDEEP PUNIYA wrote:



OpenPGP_signature
Description: OpenPGP digital signature


Re: unsubscribe

2021-06-21 Thread Dawid Wysakowicz
You should send a message to user-unsubscr...@flink.apache.org if you
want to unsubscribe.

On 19/06/2021 18:04, 林俊良 wrote:
>



OpenPGP_signature
Description: OpenPGP digital signature


Re:

2021-06-21 Thread Dawid Wysakowicz
You should send a message to user-unsubscr...@flink.apache.org if you
want to unsubscribe.

On 21/06/2021 04:25, 张万新 wrote:
> unsubscribe



OpenPGP_signature
Description: OpenPGP digital signature


Re: DSL for Flink CEP

2021-06-03 Thread Dawid Wysakowicz
Hi,

Just to add on top to what Fabian said.

The only community supported CEP library is the one that comes with
Flink[1]. It is also used internally to support the MATCH_RECOGNIZE
clause in Flink SQL[2]. Therefore there is a both programmatic library
native DSL for defining patterns. Moreover you can use SQL. As Fabian
mentioned, you can say the library is in a maintenance mode, primarily
because there is no one interested in actively working on it.

If you use other CEP libraries that are integrated with Flink, it's
probably a better idea to reach out to the maintainers of said libraries.

Lastly, I am not aware of any comparisons of CEP libraries/extensions
that work with Flink. I am afraid you have to do the feature comparison
yourself. I think the documentation for community supported library is a
good start for it.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/cep/

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/match_recognize/

On 02/06/2021 13:23, Fabian Paul wrote:
> Hi Dipanjan,
>
> I am afraid there are no foreseeable efforts planned but if you find a
> nice addition, you can 
> start a discussion in the community about this feature.
>
> Best,
> Fabian
>> On 2. Jun 2021, at 12:10, Dipanjan Mazumder > <mailto:java...@yahoo.com>> wrote:
>>
>> Hi Fabian,
>>
>>      Understood but is there any plan to grow the flink  CEP and
>> build a friendly DSL around flink CEP by any chance.
>>
>> Regards
>> Dipanjan
>>
>> On Wednesday, June 2, 2021, 03:22:46 PM GMT+5:30, Fabian Paul
>> mailto:fabianp...@data-artisans.com>>
>> wrote:
>>
>>
>> Hi Dipanjan,
>>
>> Unfortunately, I have no experience with Siddhi but I am not aware of
>> any official joined efforts between Flink and Siddhi.
>> I can imagine that not all Siddhi CEP expressions are compatible with
>> Flink’s CEP. At the moment there is also no active
>> development for Flink’s CEP. 
>>
>> I think to get a better understanding what the caveats are of the
>> third party solution you have to directly reach out to the 
>> maintainers.
>>
>> Best,
>> Fabian
>>
>>
>>> On 2. Jun 2021, at 08:37, Dipanjan Mazumder >> <mailto:java...@yahoo.com>> wrote:
>>>
>>> Hi ,
>>>
>>>    I am currently using Siddhi CEP with flink , but the flink-siddhi
>>> library has limited support for flnk versions and i will either need
>>> to fix the library or get tied to a fix version of Flink to use th
>>> library.
>>>
>>>  I am looking at Flink CEP as an option , and also came across a
>>> Flink CEP DSL library (https://github.com/phil3k3/flink-cep-dsl
>>> <https://github.com/phil3k3/flink-cep-dsl>) , but i am not sure
>>> about the acceptance for the same by the Flink community and DEV.
>>> Also is Flink CEP supporting the Siddhi CEP constructs and is rich
>>> on the same aspect.
>>>
>>> Please let me know the same , so that i can take a cautious decision
>>> on the same.
>>>
>>> Regards
>>> Dipanjan
>>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: [ANNOUNCE] Apache Flink 1.13.1 released

2021-05-31 Thread Dawid Wysakowicz
Hi Youngwoo,

Usually we publish the docker images a day after the general release, so
that the artifacts are properly distributed across Apache mirrors. You
should be able to download the docker images from apache/flink now. It
may take a few extra days to have the images published as the official
image, as it depends on the maintainers of docker hub.

Best,

Dawid

On 31/05/2021 08:01, Youngwoo Kim (김영우) wrote:
> Great work! Thank you Dawid and all of the contributors.
> I'm eager to adopt the new release, however can't find docker images for
> that from https://hub.docker.com/_/flink
>
> Hope it'll be available soon.
>
> Thanks,
> Youngwoo
>
>
> On Sat, May 29, 2021 at 1:49 AM Dawid Wysakowicz 
> wrote:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.13.1, which is the first bugfix release for the Apache Flink 1.13
>> series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> https://flink.apache.org/news/2021/05/28/release-1.13.1.html
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Dawid Wysakowicz
>>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Running multiple CEP pattern rules

2021-05-31 Thread Dawid Wysakowicz
I am afraid there is no much of an active development going on in the
CEP library. I would not expect new features there in the nearest future.

On 28/05/2021 22:00, Tejas wrote:
> Hi Dawid,
> Do you have any plans to bring this functionality in flink CEP in future ?
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



OpenPGP_signature
Description: OpenPGP digital signature


Re: Running multiple CEP pattern rules

2021-05-28 Thread Dawid Wysakowicz
Hi Tejas,

It will not work that way. Bear in mind that every application of
CEP.pattern creates a new operator in the graph. The exceptions you are
seeing most probably result from calculating that huge graph and sending
that over. You are reaching the timeout on submitting that huge graph.
You can have many different patterns in a single job, but the number of
vertices in your graph is not unlimited.

In your scenario I'd try to combine the rules in a single operator. You
could try to use the ProcessFunction for that.

Best,

Dawid

On 28/05/2021 01:53, Tejas wrote:
> Hi,
> We are running into errors when running multiple CEP patterns. Here’s our
> use-case :
> We are planning to build a rule based engine on top of flink with huge
> number of rules and doing a POC for that. For POC we have around 1000
> pattern based rules which we are translating into CEP patterns and running
> these rules on a keyed stream of events data to detect patterns. We are
> partitioning the stream by orgId and each rule needs to be run into each
> org. Here’s the code we’ve written to implement that :
> /DataStream eventStream = null;
> DataStream partitionedInput =
> eventStream.keyBy((KeySelector) Event::getOrgid);
> List ruleList = new ArrayList<>();
> for (int i = 0; i < 100; i++) {
>   ruleList.add(new Rule("rule" + i, "process1", "process2", "process3"));
>   ruleList.add(
>   new Rule("rule" + (i + 500), "process4", "process5", "process6"));
> }
> for (Rule rule : ruleList) {
>   String st = rule.getStart();
>   String mi = rule.getMid();
>   String en = rule.getEnd();
>   String nm = rule.getName();
>   Pattern pattern =
>   Pattern.begin(
>   Pattern.begin("start")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event value) throws Exception {
>   return value.getProcess().equals(st);
> }
>   })
>   .followedBy("middle")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event event) {
>   return !event.getProcess().equals(mi);
> }
>   })
>   .optional()
>   .followedBy("end")
>   .where(
>   new SimpleCondition() {
> @Override
> public boolean filter(Event event) {
>   return event.getProcess().equals(en);
> }
>   }));
>   PatternStream patternStream = CEP.pattern(partitionedInput,
> pattern);
>   DataStream alerts =
>   patternStream.process(
>   new PatternProcessFunction() {
> @Override
> public void processMatch(
> Map> map, Context context,
> Collector collector)
> throws Exception {
>   Event start = map.containsKey("start") ?
> map.get("start").get(0) : null;
>   Event middle = map.containsKey("middle") ?
> map.get("middle").get(0) : null;
>   Event end = map.containsKey("end") ? map.get("end").get(0) :
> null;
>   StringJoiner joiner = new StringJoiner(",");
>   joiner
>   .add("Rule : " + nm + " ")
>   .add((start == null ? "" : start.getId()))
>   .add((middle == null ? "" : middle.getId()))
>   .add((end == null ? "" : end.getId()));
>   collector.collect(joiner.toString());
> }
>   });
>   alerts.print();/
> We tried to run this code on the flink cluster with 1 task manager with 4
> task slots and the task manager crashed with the error :
> /Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>   at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>   at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
>   at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
>   at
> org.apache.flink.runti

[ANNOUNCE] Apache Flink 1.13.1 released

2021-05-28 Thread Dawid Wysakowicz
|The Apache Flink community is very happy to announce the release of
Apache Flink 1.13.1, which is the first bugfix release for the Apache
Flink 1.13 series.|
 
|Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.|
 
|The release is available for download at:|
|https://flink.apache.org/downloads.html
<https://flink.apache.org/downloads.html>|
 
|Please check out the release blog post for an overview of the
improvements for this bugfix release:|
|https://flink.apache.org/news/2021/05/28/release-1.13.1.html|
 
|The full release notes are available in Jira:|
|https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058|
 
|We would like to thank all contributors of the Apache Flink community
who made this release possible!|
 
|Regards,|
|Dawid Wysakowicz
|


OpenPGP_signature
Description: OpenPGP digital signature


Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-28 Thread Dawid Wysakowicz
Thank you all for the votes. I am happy to say we approved the release.
I will write a separate summary mail.

Best,

Dawid

On 28/05/2021 14:40, Robert Metzger wrote:
> +1 (binding)
>
> - Tried out reactive mode in from the scala 2.11 binary locally (with
> scale up & stop with savepoint)
> - reviewed website update
> - randomly checked a jar file in the staging repo (flink-python jar
> looks okay (I just checked superifically))
>
>
>
>
> On Fri, May 28, 2021 at 5:16 AM Leonard Xu  <mailto:xbjt...@gmail.com>> wrote:
>
>
> +1 (non-binding)
>
> - verified signatures and hashes
> - built from source code with scala 2.11 succeeded
> - started a cluster, WebUI was accessible, ran a window word count
> job, no suspicious log output
> - ran some SQL jobs in SQL Client, the queries result is expected
> - the web PR looks good
>
> Best,
> Leonard
>
>
> > 在 2021年5月28日,10:25,Xingbo Huang  <mailto:hxbks...@gmail.com>> 写道:
> >
> > +1 (non-binding)
> >
> > - verified checksums and signatures
> > - built from source code
>     > - check apache-flink source/wheel package content
> > - run python udf job
> >
> > Best,
> > Xingbo
> >
> > Dawid Wysakowicz  <mailto:dwysakow...@apache.org> <mailto:dwysakow...@apache.org
> <mailto:dwysakow...@apache.org>>> 于2021年5月27日周四 下午9:45写道:
> > +1 (binding)
> >
> > verified signatures and checksums
> > built from sources and run an example, quickly checked Web UI
> > checked diff of pom.xml and NOTICE files from 1.13.0,
> > there were no version changes,
> > checked the updated licenses of javascript dependencies
> > Best,
> >
> > Dawid
> >
> > On 26/05/2021 11:15, Matthias Pohl wrote:
> >> Hi Dawid,
> >> +1 (non-binding)
> >>
> >> Thanks for driving this release. I checked the following things:
> >> - downloaded and build source code
> >> - verified checksums
> >> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> >> - did a visual check of the release blog post
> >> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> >> suspicious found in the logs
> >> - verified change FLINK-22866 manually whether the issue is fixed
> >>
> >> Best,
> >> Matthias
> >>
> >> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>>
> <mailto:dwysakow...@apache.org <mailto:dwysakow...@apache.org>>
> >> wrote:
> >>
> >>> Hi everyone,
> >>> Please review and vote on the release candidate #1 for the
> version 1.13.1,
> >>> as follows:
> >>> [ ] +1, Approve the release
> >>> [ ] -1, Do not approve the release (please provide specific
> comments)
> >>>
> >>>
> >>> The complete staging area is available for your review, which
> includes:
> >>> * JIRA release notes [1],
> >>> * the official Apache source release and binary convenience
> releases to be
> >>> deployed to dist.apache.org <http://dist.apache.org>
> <http://dist.apache.org/ <http://dist.apache.org/>> [2], which are
> signed with the key with
> >>> fingerprint 31D2DD10BFC15A2D [3],
> >>> * all artifacts to be deployed to the Maven Central Repository
> [4],
> >>> * source code tag "release-1.13.1-rc1" [5],
> >>> * website pull request listing the new release and adding
> announcement
> >>> blog post [6].
> >>>
> >>> The vote will be open for at least 72 hours. It is adopted by
> majority
> >>> approval, with at least 3 PMC affirmative votes.
> >>>
> >>> Best,
> >>> Dawid
> >>>
> >>> [1]
> >>>
> 
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> 
> <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058>
> 
> <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
> 
> <https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058>>
> >>> [2]
&g

Re: [VOTE] Release 1.13.1, release candidate #1

2021-05-27 Thread Dawid Wysakowicz
+1 (binding)

  * verified signatures and checksums
  * built from sources and run an example, quickly checked Web UI
  * checked diff of pom.xml and NOTICE files from 1.13.0,
  o there were no version changes,
  o checked the updated licenses of javascript dependencies

Best,

Dawid

On 26/05/2021 11:15, Matthias Pohl wrote:
> Hi Dawid,
> +1 (non-binding)
>
> Thanks for driving this release. I checked the following things:
> - downloaded and build source code
> - verified checksums
> - double-checked diff of pom files between 1.13.0 and 1.13.1-rc1
> - did a visual check of the release blog post
> - started cluster and ran jobs (WindowJoin and WordCount); nothing
> suspicious found in the logs
> - verified change FLINK-22866 manually whether the issue is fixed
>
> Best,
> Matthias
>
> On Tue, May 25, 2021 at 3:33 PM Dawid Wysakowicz 
> wrote:
>
>> Hi everyone,
>> Please review and vote on the release candidate #1 for the version 1.13.1,
>> as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>>
>> The complete staging area is available for your review, which includes:
>> * JIRA release notes [1],
>> * the official Apache source release and binary convenience releases to be
>> deployed to dist.apache.org [2], which are signed with the key with
>> fingerprint 31D2DD10BFC15A2D [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "release-1.13.1-rc1" [5],
>> * website pull request listing the new release and adding announcement
>> blog post [6].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Best,
>> Dawid
>>
>> [1]
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058
>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/
>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapacheflink-1422/
>> [5] https://github.com/apache/flink/tree/release-1.13.1-rc1
>> [6] https://github.com/apache/flink-web/pull/448
>>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Customer operator in BATCH execution mode

2021-05-26 Thread Dawid Wysakowicz
Hi,

No there is no API in the operator to know which mode it works in. We
aim to have separate operators for both modes if required. You can check
e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].

Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
you want to apply a transformation at the end of each key. You could
also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.

A side note, I don't fully get what you mean by "build state for our
streaming application". Bear in mind though you cannot take a savepoint
from a job running in the BATCH execution mode. Moreover it uses a
different kind of StateBackend. Actually a dummy one, which just
imitates a real state backend.

Best,

Dawid


[1]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/KeyedBroadcastStateTransformationTranslator.java

On 25/05/2021 17:04, ChangZhuo Chen (陳昌倬) wrote:
> Hi,
>
> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application. Due to different between
> batch & streaming mode, we want to check current execution mode in
> custom operator. So our question is:
>
>
> * Is there any API for custom operator to know current execution mode
>   (batch or streaming)?
>
> * If we want to output after all elements of one specific key are
>   processed, can we just use timer since timer is triggered at the end
>   of input [0]?
>
>
> [0] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>



OpenPGP_signature
Description: OpenPGP digital signature


[VOTE] Release 1.13.1, release candidate #1

2021-05-25 Thread Dawid Wysakowicz
|Hi everyone,|
|Please review and vote on the release candidate #1 for the version
1.13.1, as follows:|
|[ ] +1, Approve the release|
|[ ] -1, Do not approve the release (please provide specific comments)|
 
 
|The complete staging area is available for your review, which includes:|
|* JIRA release notes [1],|
|* the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [2], which are signed with the key with
fingerprint ||31D2DD10BFC15A2D|| [3],|
|* all artifacts to be deployed to the Maven Central Repository [4],|
|* source code tag "release-1.13.1-rc1" [5],|
|* website pull request listing the new release and adding announcement
blog post [6]. |
 
|The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.|
 
|Best,|
|Dawid|
 
|[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058|
|[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/|
|[3] https://dist.apache.org/repos/dist/release/flink/KEYS
<https://dist.apache.org/repos/dist/release/flink/KEYS>|
|[4]
https://repository.apache.org/content/repositories/orgapacheflink-1422/|
|[5] https://github.com/apache/flink/tree/release-1.13.1-rc1|
|[6] https://github.com/apache/flink-web/pull/448|


OpenPGP_signature
Description: OpenPGP digital signature


Re: Is it possible to leverage the sort order in DataStream Batch Execution Mode?

2021-05-21 Thread Dawid Wysakowicz
I am afraid it is not possible to leverage the sorting for business
logic. The sorting is applied on binary representation of the key as it
is not necessary sorting per se, but rather grouping by the same keys.
You can find more information in the FLIP of this feature e.g. here[1]

Best,

Dawid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A+Introduce+batch-style+execution+for+bounded+keyed+streams#FLIP140:Introducebatchstyleexecutionforboundedkeyedstreams-Howtosort/groupkeys

On 21/05/2021 09:58, Marco Villalobos wrote:
> Hello. I am using Flink 1.12.1 in EMR.
>
> I am processing historical time-series data with the DataStream
> API in Batch execution mode.
>
> I must average time series data into a fifteen minute interval
> and forward fill missing values.
>
> For example, this input:
>
> name, timestamp, value
> a,2019-06-23T00:07:30Z,10
> b,2019-06-23T00:05:30Z,7
> a,2019-06-23T00:09:30Z,10
> a,2019-06-23T00:37:30Z,10
>
> would yield this output:
>
> name, timestamp, value, is_forward_fill
> a,2019-06-23T00:15:00Z,20,false
> b,2019-06-23T00:15:00Z,7,false
> a,2019-06-23T00:30:00Z,20,true
> b,2019-06-23T00:30:00Z,7,true
> a,2019-06-23T00:45:00Z,5,false
> b,2019-06-23T00:30:00Z,7,true
>
> My stream code looks something like this
>
> STREAM PSEUDO CODE
>
> stream.keyBy(Tuple with step,name)
>     .window(TumblingEventTimeWindows.of(Time.minutes(15)))
>     .aggregate(new AggregateFunction(), new
> AggregateProcessWindowFunction())
>     .name("aggregate")
>     .keyBy(Tuple with name)
>     .process(new FillKeyedProcessFunction())
>     .name("fill");
>
> The documentation
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html>)
> suggests that the stream might be sorted by key.
>
> If that's true, my fill function can be greatly simplified if I were
> able to leverage that somehow.
>
> I tried implementing a custom pojo key for the fill function like this:
>
> public class FillKey implement Comparable, Serializable  {
>     String name;
>     Instant timestamp;
>     equals // only checks name
>     hashcode // only hash name
>     compareTo // compares name, timestamp
> }
>
> Notice that my key only checks equality on the name, and hashes only
> the name, but when
> it performs comparisons it orders by name, timestamp.
>
> My stream now looks like this:
>
> stream.keyBy(Tuple with step,name)
>     .window(TumblingEventTimeWindows.of(Time.minutes(15)))
>     .aggregate(new AggregateFunction(), new
> AggregateProcessWindowFunction())
>     .name("aggregate")
>     .keyBy(new KeySelector() {
>         @Override
>         public FillKey getKey(TimeSeries value) throws Exception {
>             return new FillKey(value.name <http://value.name>,
> value.timestamp);
>         }
>     }).process(new FillKeyedProcessFunction())
>     .name("fill");
>
> and it seemed to arrive sorted, but I am getting the woong output
> because my keyed state no longer seems to work. I expected the stream
> to arrive in the order of name, step ascending. However, keyed state
> behaved as though each element that I thought would share the name was
> different.
>
> Is there an issue with Pojo Keys that break the keyed state in batch
> execution mode?
> Is it possible to take advantage of the sort order within the business
> logic as I am trying to do?


OpenPGP_signature
Description: OpenPGP digital signature


Re: Possible way to avoid unnecessary serialization calls.

2021-05-12 Thread Dawid Wysakowicz
Hi Alex,

I cannot reproduce the issue. Do you mind checking if it is not an issue
on your side?

P.S. It would be nice if you could reply to the ML as well. That way
other people can benefit from the answers. Moreover there will be more
people who could help answering your question.

Best,

Dawid

On 12/05/2021 11:36, Alex Drobinsky wrote:
> Hi Dawid,
>
> I upgraded flink to 1.13.0 and stumbled upon strange phenomenon (
> usually, I would use word bug though ) - 
> the start-cluster.sh overwrites my
> flink-conf.yml taskmanager.numberOfTaskSlots , now it's always set to 1.
> Is it a bug or something else ?
>
> Best regards,
> Alexander
>
>
> пн, 10 мая 2021 г. в 15:08, Dawid Wysakowicz  <mailto:dwysakow...@apache.org>>:
>
> Hi Alex,
>
> If you are sure that the operations in between do not change the
> partitioning of the data and keep the key constant for the whole
> pipeline you could use the reinterpretAsKeyedStream[1]. I guess this
> answers your questions 1 & 2.
>
> As for the third question, first of all you should look into enabling
> object reuse[2]. Make sure though you work with immutable objects.
> Secondly, all operators that simply forwards records should be chained
> by default. If you need a more fine grained control over it you
> can look
> into this docs[3]
>
> Best,
>
> Dawid
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream>
>
> [2]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration>
>
> [3]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups>
>
> On 10/05/2021 08:59, Alex Drobinsky wrote:
> > Dear entity that represents Flink user community,
> >
> > In order to formulate the question itself, I would need to describe
> > the problem in many details, hence please bear with me for a while.
> >
> > I have following execution graph:
> >
> > KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy ->
> > Storage -> keyBy -> Classifier -> KafkaSink (This is slightly
> > simplified version )
> >
> > When I noticed less than ideal throughput, I executed profiler which
> > identified
> > org.apache.flink.streaming.runtime.io
> 
> <http://runtime.io>.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
> > as a major function (83% of time spent here). 45% of total time is
> > spent in
> > org.apache.flink.runtime.io
> 
> <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).
> >
> > The serialization is protobuf with Kryo, according to benchmarks it
> > isn't particularly slow , should be similar or a bit better than
> POJO.
> >
> > The problem from my point of view is that serialization shouldn't
> > happen at all, unless data is actually sent via network to another
> > node ( in my case I have one job manager and one task manager ).
> >
> > However, I would suspect that keyBy operation implicitly enforces
> > usage of serialization / deserialization.
> >
> > First question : In this particular case, the key is exactly the
> same
> > for every keyBy, is there any other way than combining
> operations into
> > a single operator to avoid performance impact from keyBy chain ?
> >
> > Second question : could I use the process function after keyBy
> in such
> > a way that it will not merge stream back e.g. it will continue to be
> > KeyedStream ?
> >
> > Third question: could I somehow specify that the sequence of
> operators
> > must be executed in the same thread without
> > serialization/deserialization operations in between ?
> >
> >
> > Best regards,
> > Alexander 
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink: Clarification required

2021-05-10 Thread Dawid Wysakowicz
Hi Jessy,

I have a stateless Flink application where the source and sink are
two different Kafka topics. Is there any benefit in adding
checkpointing for this application?. will it help in some way for
the rewind and replays while restarting from the failure?

If you do want to make sure that your application has either
AT_LEAST_ONCE or EXACTLY_ONCE semantic[1] you need to enable the
checkpointing. Flink needs to keep track of the offsets which it stores
in its state to achieve those. Therefore even though your
transformations do not have state themselves, the source does have a state.

So my question is, processing the incoming streams based on these
rules stored in Flink state per key is efficient or not ( i am using
rocksdb as state-backend ) ?

There is no one good answer for that question. It varies a lot depending
on the volume of data etc. I'd recommend checking it yourselves if you
are happy with the performance. That's definitely an approach a lot of
people implemented and were happy with it. There is also a blog post
(rather oldish by now) which describes how you could implement such
pattern[2]

Is there any benefit in using Apache camel along with Flink ?

I am not very familiar with Apache Camel so can't say much on this. As
far as I know Apache Camel is more of a routing system, whereas Flink is
a data processing framework.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

[2] https://flink.apache.org/2019/06/26/broadcast-state.html

On 10/05/2021 11:13, Jessy Ping wrote:
>
> Hi all,
>
>
> Currently, we are exploring the various features of Flink and need
> some clarification on the below-mentioned questions.
>
>   * I have a stateless Flink application where the source and sink are
> two different Kafka topics. Is there any benefit in adding
> checkpointing for this application?. will it help in some way for
> the rewind and replays while restarting from the failure?
>
>   * I have a stateful use case where events are processed based on a
> set of dynamic rules provided by an external system, say a Kafka
> source. Also, the actual events are distinguishable based on a
> key.A broadcast function is used for broadcasting the dynamic
> rules and storing the same in Flink state. 
>
> So my question is, processing the incoming streams based on these
> rules stored in Flink state per key is efficient or not ( i am
> using rocksdb as state-backend ) ?
>
> What about using an external cache for this?
>
> Is stateful function a good contender here? 
>
>   *  Is there any benefit in using Apache camel along with Flink ?
>
>
> Thanks
> Jessy


OpenPGP_signature
Description: OpenPGP digital signature


Re: Possible way to avoid unnecessary serialization calls.

2021-05-10 Thread Dawid Wysakowicz
Hi Alex,

If you are sure that the operations in between do not change the
partitioning of the data and keep the key constant for the whole
pipeline you could use the reinterpretAsKeyedStream[1]. I guess this
answers your questions 1 & 2.

As for the third question, first of all you should look into enabling
object reuse[2]. Make sure though you work with immutable objects.
Secondly, all operators that simply forwards records should be chained
by default. If you need a more fine grained control over it you can look
into this docs[3]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/execution_configuration/#execution-configuration

[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/overview/#task-chaining-and-resource-groups

On 10/05/2021 08:59, Alex Drobinsky wrote:
> Dear entity that represents Flink user community,
>
> In order to formulate the question itself, I would need to describe
> the problem in many details, hence please bear with me for a while.
>
> I have following execution graph:
>
> KafkaSource -> Message parser -> keyBy -> TCP assembly -> keyBy ->
> Storage -> keyBy -> Classifier -> KafkaSink (This is slightly
> simplified version )
>
> When I noticed less than ideal throughput, I executed profiler which
> identified
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(PushingAsyncDataInput$DataOutput)
> as a major function (83% of time spent here). 45% of total time is
> spent in
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(IOReadableWritable).
>
> The serialization is protobuf with Kryo, according to benchmarks it
> isn't particularly slow , should be similar or a bit better than POJO.
>
> The problem from my point of view is that serialization shouldn't
> happen at all, unless data is actually sent via network to another
> node ( in my case I have one job manager and one task manager ).
>
> However, I would suspect that keyBy operation implicitly enforces
> usage of serialization / deserialization.
>
> First question : In this particular case, the key is exactly the same
> for every keyBy, is there any other way than combining operations into
> a single operator to avoid performance impact from keyBy chain ?
>
> Second question : could I use the process function after keyBy in such
> a way that it will not merge stream back e.g. it will continue to be
> KeyedStream ?
>
> Third question: could I somehow specify that the sequence of operators
> must be executed in the same thread without
> serialization/deserialization operations in between ?
>
>
> Best regards,
> Alexander 



OpenPGP_signature
Description: OpenPGP digital signature


Re: About the windowOperator and Watermark

2021-05-10 Thread Dawid Wysakowicz
Hi,

When a Watermark arrives the window operator will emit all windows that
are considered finished at the time of the Watermark. In your example
(assuming both windows are finished) they will both be emitted.

Best,

Dawid

On 08/05/2021 08:03, 曲洋 wrote:
> Hi Experts,
>
> Given that a window in the stream is configured with short window size like 
> timeWinodw(3s),
> and I gotta utilize Event time and Periodic Watermark.
> The stream input is [watermark(7) | 6, 5, 3, 4, 1, 2],
> and then two windows are created (3,1,2) (6,5,4) before watermark(7) arriving.
> But in this situation when the current watermark is received,
> which window or how many windows will be be triggered to fire and emit?
> My question is what will the windowOperater do when it comes to two parellel 
> windows edge end timestamps both smaller than cerrent watermark timestamps?



OpenPGP_signature
Description: OpenPGP digital signature


Re: What does enableObjectReuse exactly do?

2021-05-10 Thread Dawid Wysakowicz
Hi,

In the streaming API, the biggest difference is that if you do not
disable object reuse, records will be duplicated/copied when forwarding
from an operator to the downstream one. If you are sure you work with
immutable objects, I'd highly recommend enabling object reuse.

Best,

Dawid

On 08/05/2021 05:24, 杨力 wrote:
> I wrote a streaming job with scala, using only immutable case class.
> Is it safe to enable object reuse? Will it get benefits from enabling
> object reuse?
>
> I reached to documents but they cover neither streaming cases nor
> immutable data structures.



OpenPGP_signature
Description: OpenPGP digital signature


Re: Read kafka offsets from checkpoint - state processor

2021-05-10 Thread Dawid Wysakowicz
Hi,

You would need to look into the internals of FlinkKafkaConsumerBase. In
the current master the state for offsets is initialized in here:
https://github.com/apache/flink/blob/fbf84acf63102db455c89cb8e497cda423a1c4d5/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L984

Hope it helps.

Best,

Dawid

On 07/05/2021 14:43, bat man wrote:
> Anyone who has tried this or can help on this.
>
> Thanks.
>
> On Thu, May 6, 2021 at 10:34 AM bat man  <mailto:tintin0...@gmail.com>> wrote:
>
> Hi Users,
>
> Is there a way that Flink 1.9 the checkpointed data can be read
> using the state processor api. 
> Docs [1] says - When reading operator state, users specify the
> operator uid, the state name, and the type information.
>
> What is the type for the kafka operator, which needs to be
> specified while reading the state.
>
> [1]
> - 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/>
>
> Thanks,
> Hemant
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Unsubscribe

2021-05-10 Thread Dawid Wysakowicz
Hi all,

Before reaching out to the INFRA team. May I ask all of you to make sure
that you follow the two-step process? After sending the initial mail to
the user-unsubscr...@flink.apache.org
<mailto:user-unsubscr...@flink.apache.org> you should receive a request
for a confirmation. If you did confirm it and you are still receiving
messages from the ML, make sure you're using the mail you're subscribed
with.

Best,

Dawid

On 06/05/2021 18:19, Dan Pettersson wrote:
> I've also tried a few times now the last couple of months. I think it
> would be very nice if the "flink admin" could look into this, instead
> of us reaching out to the Apache Infrastructure team. 
>
> Thanks, 
>
> /Dan
>
> Den tors 6 maj 2021 kl 13:31 skrev Chesnay Schepler
> mailto:ches...@apache.org>>:
>
> Could you reach out to the Apache Infrastructure team
> <https://issues.apache.org/jira/projects/INFRA> about not being
> able to unsubscribe? Maybe this functionality is currently broken.
>
> On 5/6/2021 12:48 PM, Andrew Kramer wrote:
>> I have been unable to unsubscribe as well. Have tried emailing
>> just like you 
>>
>> On Thu, May 6, 2021 at 3:33 AM Xander Song > <mailto:iamuuriw...@gmail.com>> wrote:
>>
>> How can I unsubscribe from the Apache Flink user mailing
>> list? I have tried emailing user-unsubscr...@flink.apache.org
>> <mailto:user-unsubscr...@flink.apache.org>, but am still
>> receiving messages.
>>
>> Thank you.
>>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: callback by using process function

2021-05-10 Thread Dawid Wysakowicz
Hi,

I am sorry, but I think I don't fully get your question. Could you try
to rephrase it? Maybe an example could help.

Generally speaking the KeyedProcessFunction is scoped to a single key.
Whenever you access a state (MapState, ValueState, ... ) it keeps the
current value of that state for the current key.

Best,

Dawid

On 06/05/2021 12:30, Abdullah bin Omar wrote:
> Hi,
>
> According to [1] example section,
>
> (i) If we check the stored count of the last modification time against
> the previous timestamp count, then emit the count if they (count from
> last modification time) match with the previous timestamp count.
>
> Is there refere about checking the previous count? am I understanding
> correctly? help me to understand this part.
>
> (ii)  can the process function be used to look back the previous
> key/count?
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/process_function/>
>
> Thank you
>



OpenPGP_signature
Description: OpenPGP digital signature


Re: some questions about data skew

2021-05-10 Thread Dawid Wysakowicz
Hi,

What you could do to improve processing of a skewed data is to introduce
an artificial preaggregation. You could add some artificial uniformly
distributed secondary key and calculate your aggregates on (original
key, secondary uniform key) and then do the final aggregation in an
additional step.

Best,

Dawid

On 06/05/2021 09:24, jester jim wrote:
> Hi,
> I have run a program to monitor the sum of the delay in every minutes
> of a stream,this is my code:
> .map(new RichMapFunction[String,(Long,Int)] {
> override def map(in: String): (Long,Int) = {
>   var str:String = "" try {
> val arr = in.split("\\|")
> ((System.currentTimeMillis()/1000 - arr(10).trim.toLong) / 60,1)
>   }catch {
> case e:Exception =>{
>   System.out.println("data has been dropped" + str)
>   null }
>   }
> }
>   }).slotSharingGroup("kafkaSource").setParallelism(200)
> .filter(item =>item !=null && item._1 
> >=0).slotSharingGroup("kafkaSource").setParallelism(200)
> signalSource.keyBy(f=>f._1  )
> .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
>   .reduce { (e1,e2) 
> =>(e1._1,e1._2+e2._2)}.setParallelism(20).slotSharingGroup("Delay")
> .addSink(new 
> OracleSink).setParallelism(1).slotSharingGroup("OracleSink").name("OracleSinkDelay")
>
> but there is a problem,when the data is not delaying,the key of
> 1,2,3,4,5 have so much data that the backPressure is always 1,has
> any way to avoid this condition?
>
> please give me some advice!thank you so much. 


OpenPGP_signature
Description: OpenPGP digital signature


Re: Guaranteeing event order in a KeyedProcessFunction

2021-05-10 Thread Dawid Wysakowicz
Hey Miguel,

I think you could take a look at the CepOperator which does pretty much
what you are describing.

As for more direct answers for your questions. If you use
KeyedProcessFunction it is always scoped to a single Key. There is no
way to process events from other keys. If you want to have more control
over state and e.g. use PriorityQueue which would be snapshotted on
checkpoint you could look into using Operator API. Bare in mind it is a
semi-public API. It is very low level and subject to change rather
frequently. Another thing to consider is that if you use PriorityQueue
instead of e.g. MapState for buffering and ordering events you are
constrained by the available memory. We used PriorityQueue in the past
in the CepOperator but migrated it to MapState.

It is possible that events in downstream operators can become late. It
all depends on the timestamp of the events you emit from the "sorting"
operator. If you emit records with timestamps larger than the Watermark
that "triggered" its generation it can become late.

Hope those tips could help you a bit.

Best,

Dawid

On 04/05/2021 14:49, Miguel Araújo wrote:
> Hi Timo,
>
> Thanks for your answer. I think I wasn't clear enough in my initial
> message, so let me give more details.
>
> The stream is not keyed by timestamp, it's keyed by a custom field
> (e.g., user-id) and then fed into a KeyedProcessFunction. I want to
> process all events for a given user in order, before sending them
> downstream for further processing in other operators. I don't want to
> hold events longer than needed, hence using the watermark to signal
> which events can be processed.
> I don't think your suggestion of using a ListState would work, because
> we would effectively have one list per user. That would imply (among
> other things) that an event could only be processed when a new event
> for the same user arrives, which would not only imply a (potentially)
> huge latency, but also data leakage. Not to mention that the events
> being sent could easily be considered late-events to the downstream
> operators.
> The idea of keying by timestamp was an "evolution" of the ListState
> suggestion, where events-to-be-later-processed would be kept sorted in
> the map (which is what would be keyed by timestamp). We could iterate
> the map to process the events, instead of fetching the full list and
> sorting it to process the events in order. I don't think this solves
> any of the problems mentioned above, so I think that mentioning it
> only raised confusion.
>
> Regarding global event-time order, that's not really what I'm after. I
> only need event-time order per key, but I want to process the event as
> soon as possible, constrained by knowing that it is "safe" to do so
> because no event with a smaller timestamp for this key is yet to come.
>
> So, rephrasing my question as I'm not sure that part was clear in the
> initial message, here is the idea:
> - keeping one priority queue (ordered by timestamp) in each
> KeyedProcessFunction instance. Therefore, each priority queue would
> store events for multiple keys.
> - when an event arrives, we push it to the queue and then process
> events (updating state and sending them downstream) while their
> timestamp is lower than the current watermark.
>
> The question is:
> - is this fault tolerant? The priority queue is not state that is
> managed by flink, but it should be recoverable on replay.
> - is it possible that the events I'm sending downstream become
> late-events for a different operator, for some reason? Will they
> always be sent before the watermark of the event that originated the
> processElement() call?
> - I would effectively be processing multiple elements (from multiple
> keys) in the same call to processElement(). Is there a way to access
> the state of different keys?
>
> This doesn't feel like the right approach. Is there an operator more
> suitable than a KeyedProcessFunction which would allow me to handle
> the state for multiple keys in this task manager? Should I register a
> timer to trigger on the event timestamp instead? I believe timers
> trigger on watermarks, so that could theoretically work, although it
> feels a little weird. After all, what I want is just to buffer events
> so that they are only processed when the watermark has caught up to them.
>
> Thanks
>
> Timo Walther mailto:twal...@apache.org>> escreveu
> no dia sexta, 30/04/2021 à(s) 12:05:
>
> Hi Miguel,
>
> your initial idea sounds not too bad but why do you want to key by
> timestamp? Usually, you can simply key your stream by a custom key
> and
> store the events in a ListState until a watermark comes in.
>
> But if you really want t

[ANNOUNCE] Apache Flink 1.13.0 released

2021-05-03 Thread Dawid Wysakowicz
|The Apache Flink community is very happy to announce the release of
Apache Flink 1.13.0.|
 
|Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.|
 
|The release is available for download at:|
|https://flink.apache.org/downloads.html
<https://flink.apache.org/downloads.html>|
 
|Please check out the release blog post for an overview of the
improvements for this bugfix release:|
|https://flink.apache.org/news/2021/05/03/release-1.13.0.html|
<https://flink.apache.org/news/2021/05/03/release-1.13.0.html>
 
|The full release notes are available in Jira:|
|https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287
<https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12349287>|
 
|We would like to thank all contributors of the Apache Flink community
who made this release possible!|
 
|Regards,|
|Guowei & Dawid
|


OpenPGP_signature
Description: OpenPGP digital signature


Re: Contiguity in SQL vs CEP

2021-04-26 Thread Dawid Wysakowicz
Hi,

MATCH_RECOGNIZE clause in SQL standard does not support different
contiguities. The MATCH_RECOGNIZE always uses the strict contiguity.

Best,

Dawid

On 21/04/2021 00:02, tbud wrote:
> There's 3 different types of Contiguity defined in the CEP documentation [1] 
> looping + non-looping -- Strict, relaxed and non deterministic relaxed.
> There's no equivalent in the SQL documentation [2]. Can someone shed some
> light on what's achievable in SQL and what isn't ?
> Related question : It seems as if SQL default if I define a pattern such as
> (A B+ C) is the relaxed mode. if this is true then when keeping state using
> sql are the non-matching events dropped from the state ?
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns>
>   
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#defining-a-pattern
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



OpenPGP_signature
Description: OpenPGP digital signature


Re: Contiguity and state storage in CEP library

2021-04-26 Thread Dawid Wysakowicz
Hi,

Yes you are correct that if an event can not match any pattern it won't
be stored in state. If you process your records in event time it might
be stored for a little while before processing in order to sort the
incoming records based on time. Once a Watermark with a higher timestamp
comes it will be processed and if it does not match it will be discarded
and it won't be stored any longer.

Best,

Dawid

On 21/04/2021 02:44, tbud wrote:
> We are evaluating a use-case where there will be 100s of events stream coming
> in per second and we want to run some fixed set of pattern matching rules on
> them And I use relaxed contiguity rules as described in the documentation.
> for example :
> /a pattern sequence "a b+ c" on the stream of "a", "b1", "d1", "b2", "d2",
> "b3" "c" will have results as -- {a b1 c}, {a b1 b2 c}, {a b1 b2 b3 c}, {a
> b2 c}, {a b2 b3 c}, {a b3 c}
> and I specify time window to be 60 mins using within() clause for this
> pattern.
> /
> Does this mean that the events which don't match i.e. "d2" won't be stored
> in state at all ? does the CEP store only matching events in the state for
> 60 minutes ?
>
> This question is important to estimate the state backend size required for
> the usecase and to make sure that the application doesn't go out of memory
> due to ever increasing state.
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



OpenPGP_signature
Description: OpenPGP digital signature


Re: Python Integration with Ververica Platform

2021-04-13 Thread Dawid Wysakowicz
I'd recommend reaching out directly to Ververica. Ververica platform is
not part of the open-source Apache Flink project.

I can connect you with Konstantin who I am sure will be happy to answer
your question ;)

Best,

Dawid

On 12/04/2021 15:40, Robert Cullen wrote:
> I've been using the Community Edition v2.4.  Just wondering if there
> is a python integration coming in future versions.
>
> tnanks
>
> -- 
> Robert Cullen
> 240-475-4490


OpenPGP_signature
Description: OpenPGP digital signature


Re: NPE when aggregate window.

2021-04-13 Thread Dawid Wysakowicz
Hi,

Could you check that your grouping key has a stable hashcode and equals?
It is very likely caused by an unstable hashcode and that a record with
an incorrect key ends up on a wrong task manager.

Best,

Dawid

On 13/04/2021 08:47, Si-li Liu wrote:
> Hi, 
>
> I encounter a weird NPE when try to do aggregate on a fixed window. If
> I set a small parallism number the whole job uses only one
> TaskManager, this NPE will not happen. But when the job scales to two
> TaskManagers, the TaskManager will crash at Create stage. The Flink
> version I use is 1.11.1.
>
> The NPE exception stack is:
>
> 2021-04-13 14:23:19,575 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Window(TumblingProcessingTimeWindows(5000),
> ProcessingTimeTrigger, AggregateDataEntry, PassThroughWindowFunction)
> -> Flat Map -> Sink: Unnamed (7/10) (7244f264349013ca7d5336fcd565bc9f)
> switched from RUNNING to FAILED.
> java.io <http://java.io>.IOException: Exception while applying
> AggregateFunction in aggregating state
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:394)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.streaming.runtime.io
> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_221]
> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:203)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> at
> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> ... 13 more
> My aggregate code is
> public class AggregateDataEntry implements AggregateFunction DataIndex>, Map, Map> {
>
> @Override public Map createAccumulator() {
> return new HashMap<>();
> }
>
> @Override public Map add(Tuple2 
> value, Map accumulator) {
> accumulator.merge(value.f0, value.f1, DataIndex::add);
> return accumulator;
> }
>
> @Override public Map getResult(Map DataIndex> accumulator) {
> return accumulator;
> }
>
> @Override public Map merge(Map a, 
> Map b) {
> a.forEach((dataKey, dataIndex) -> b.merge(dataKey, dataIndex, 
> DataIndex::add));
> return b;
> }
> }
> Could anyone know something about this NPE, thanks!
> -- 
> Best regards
>
> Sili Liu


OpenPGP_signature
Description: OpenPGP digital signature


[ANNOUNCE] Release 1.13.0, release candidate #0

2021-04-01 Thread Dawid Wysakowicz
|Hi everyone,|
|As promised I created a release candidate #0 for the version 1.13.0. I
am not starting a vote for this release as I've created it mainly for
verifying the release process. We are still aware of some improvements
coming in shortly. However we will greatly appreciate any help testing
this RC already. It can help tremendously identifying any problems early.
|
|
|
|Unfortunately I was not able to create binary convenience release for
flink-python, because of a bug in the release scripts which can be
tracked in https://issues.apache.org/jira/browse/FLINK-22095
|
 
|The complete staging area is available for your review, which includes:|
|* the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [1], which are signed with the key with
fingerprint 31D2DD10BFC15A2D [2],|
|* all artifacts to be deployed to the Maven Central Repository [3],|
|* source code tag "release-1.2.3-rc3" [4],|
 
|Your help testing the release will be greatly appreciated!
|
 
|Thanks,|
|Dawid Wysakowicz
|
 
|[1] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.0-rc0/
|
|[2] https://dist.apache.org/repos/dist/release/flink/KEYS
<https://dist.apache.org/repos/dist/release/flink/KEYS>|
|[3]
https://repository.apache.org/content/repositories/orgapacheflink-1417/|
|[4] https://github.com/apache/flink/tree/release-1.13.0-rc0|


OpenPGP_signature
Description: OpenPGP digital signature


Re: [DISCUSS] Feature freeze date for 1.13

2021-04-01 Thread Dawid Wysakowicz
Hi all,

@Kurt @Arvid I think it's fine to merge those two, as they are pretty
much finished. We can wait for those two before creating the RC0.

@Leonard Personally I'd be ok with 3 more days for that single PR. I
find the request reasonable and I second that it's better to have a
proper review rather than rush unfinished feature and try to fix it
later. Moreover it got broader support. Unless somebody else objects, I
think we can merge this PR later and include it in RC1.

Best,

Dawid

On 01/04/2021 08:39, Arvid Heise wrote:
> Hi Dawid and Guowei,
>
> I'd like to merge [FLINK-13550][rest][ui] Vertex Flame Graph [1]. We
> are pretty much just waiting for AZP to turn green, it's separate from
> other components, and it's a super useful feature for Flink users.
>
> Best,
>
> Arvid
>
> [1] https://github.com/apache/flink/pull/15054
> <https://github.com/apache/flink/pull/15054>
>
> On Thu, Apr 1, 2021 at 6:21 AM Kurt Young  <mailto:ykt...@gmail.com>> wrote:
>
> Hi Guowei and Dawid,
>
> I want to request the permission to merge this feature [1], it's a
> useful improvement to sql client and won't affect 
> other components too much. We were plan to merge it yesterday but
> met some tricky multi-process issue which 
> has a very high possibility hanging the tests. It took us a while
> to find out the root cause and fix it. 
>
> Since it's not too far away from feature freeze and RC0 also not
> created yet, thus I would like to include this
> in 1.13. 
>
> [1] https://issues.apache.org/jira/browse/FLINK-20320
> <https://issues.apache.org/jira/browse/FLINK-20320>
>
> Best,
> Kurt
>
>
> On Wed, Mar 31, 2021 at 5:55 PM Guowei Ma  <mailto:guowei@gmail.com>> wrote:
>
> Hi, community:
>
> Friendly reminder that today (3.31) is the last day of feature
> development. Under normal circumstances, you will not be able
> to submit new features from tomorrow (4.1). Tomorrow we will
> create 1.13.0-rc0 for testing, welcome to help test together.
> After the test is relatively stable, we will cut the
> release-1.13 branch.
>
> Best,
> Dawid & Guowei
>
>
> On Mon, Mar 29, 2021 at 5:17 PM Till Rohrmann
> mailto:trohrm...@apache.org>> wrote:
>
> +1 for the 31st of March for the feature freeze.
>
> Cheers,
> Till
>
> On Mon, Mar 29, 2021 at 10:12 AM Robert Metzger
> mailto:rmetz...@apache.org>> wrote:
>
> > +1 for March 31st for the feature freeze.
> >
> >
> >
> > On Fri, Mar 26, 2021 at 3:39 PM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>>
> > wrote:
> >
> > > Thank you Thomas! I'll definitely check the issue you
> linked.
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 23/03/2021 20:35, Thomas Weise wrote:
> > > > Hi Dawid,
> > > >
> > > > Thanks for the heads up.
> > > >
> > > > Regarding the "Rebase and merge" button. I find that
> merge option
> > useful,
> > > > especially for small simple changes and for
>     backports. The following
> > > should
> > > > help to safeguard from the issue encountered previously:
> > > > https://github.com/jazzband/pip-tools/issues/1085
> <https://github.com/jazzband/pip-tools/issues/1085>
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Mar 23, 2021 at 4:58 AM Dawid Wysakowicz <
> > dwysakow...@apache.org <mailto:dwysakow...@apache.org>
> > > >
> > > > wrote:
> > > >
> > > >> Hi devs, users!
> > > >>
> > > >> 1. *Feature freeze date*
> > > >>
> > > >> We are approaching the end of March which we agreed
> would be the time
> > > for
> > > >> a Feature Freeze. From the kn

Re: PyFlink Table API: Interpret datetime field from Kafka as event time

2021-03-30 Thread Dawid Wysakowicz
Hey,

I am not sure which format you use, but if you work with JSON maybe this
option[1] could help you.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/json.html#json-timestamp-format-standard

On 30/03/2021 06:45, Sumeet Malhotra wrote:
> Thanks. Yes, that's a possibility. I'd still prefer something that can
> be done within the Table API. If it's not possible, then there's no
> other option but to use the DataStream API to read from Kafka, do the
> time conversion and create a table from it.
>
> ..Sumeet
>
> On Mon, Mar 29, 2021 at 10:41 PM Piotr Nowojski  <mailto:pnowoj...@apache.org>> wrote:
>
> Hi,
>
> I hope someone else might have a better answer, but one thing that
> would most likely work is to convert this field and define even
> time during DataStream to table conversion [1]. You could always
> pre process this field in the DataStream API.
>
> Piotrek
>
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion
> 
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/time_attributes.html#during-datastream-to-table-conversion>
>
> pon., 29 mar 2021 o 18:07 Sumeet Malhotra
> mailto:sumeet.malho...@gmail.com>>
> napisał(a):
>
> Hi,
>
> Might be a simple, stupid question, but I'm not able to find
> how to convert/interpret a UTC datetime string
> like *2021-03-23T07:37:00.613910Z* as event-time using a
> DDL/Table API. I'm ingesting data from Kafka and can read this
> field as a string, but would like to mark it as event-time by
> defining a watermark.
>
> I'm able to achieve this using the DataStream API, by defining
> my own TimestampAssigner that converts the datetime string to
> milliseconds since epoch. How can I do this using a SQL DDL or
> Table API?
>
> I tried to directly interpret the string as TIMESTAMP(3) but
> it fails with the following exception:
>
> java.time.format.DateTimeParseException: Text
> '2021-03-23T07:37:00.613910Z' could not be parsed...
>
> Any pointers?
>
> Thanks!
> Sumeet
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: pipeline.auto-watermark-interval vs setAutoWatermarkInterval

2021-03-23 Thread Dawid Wysakowicz
Hey,

I would like to double check this with Jark and/or Timo. As far as
DataStream is concerned the javadoc is correct. Moreover the
pipeline.auto-watermak-interval and setAutoWatermarkInterval are
effectively the same setting/option. However I am not sure if Table API
interprets it in the same way as DataStream APi. The documentation you
linked, Aeden, describes the SQL API.

@Jark @Timo Could you verify if the SQL documentation is correct?

Best,

Dawid

On 23/03/2021 15:20, Matthias Pohl wrote:
> Hi Aeden,
> sorry for the late reply. I looked through the code and verified that
> the JavaDoc is correct. Setting pipeline.auto-watermark-interval to 0
> will disable the automatic watermark generation. I created FLINK-21931
> [1] to cover this.
>
> Thanks,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-21931
> <https://issues.apache.org/jira/browse/FLINK-21931>
>
> On Thu, Mar 4, 2021 at 9:53 PM Aeden Jameson  <mailto:aeden.jame...@gmail.com>> wrote:
>
> Correction: The first link was supposed to be,
>
> 1. pipeline.auto-watermark-interval
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#pipeline-auto-watermark-interval>
>
> On Wed, Mar 3, 2021 at 7:46 PM Aeden Jameson
> mailto:aeden.jame...@gmail.com>> wrote:
> >
> > I'm hoping to have my confusion clarified regarding the settings,
> >
> > 1. pipeline.auto-watermark-interval
> >
> 
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long->
> >
> > 2. setAutoWatermarkInterval
> >
> 
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long-
> 
> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/ExecutionConfig.html#setAutoWatermarkInterval-long->
> >
> > I noticed the default value of pipeline.auto-watermark-interval is 0
> > and according to these docs,
> >
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark
> 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#watermark>,
> > it states, "If watermark interval is 0ms, the generated watermarks
> > will be emitted per-record if it is not null and greater than
> the last
> > emitted one." However in the documentation for
> > setAutoWatermarkInterval the value 0 disables watermark emission.
> >
> > * Are they intended to be the same setting? If not how are they
> > different? Is one for FlinkSql and the other DataStream API?
> >
> > --
> > Thank you,
> > Aeden
>


OpenPGP_signature
Description: OpenPGP digital signature


[DISCUSS] Feature freeze date for 1.13

2021-03-23 Thread Dawid Wysakowicz
Hi devs, users!

1. *Feature freeze date*

We are approaching the end of March which we agreed would be the time
for a Feature Freeze. From the knowledge I've gather so far it still
seems to be a viable plan. I think it is a good time to agree on a
particular date, when it should happen. We suggest *(end of day CEST)
March 31st* (Wednesday next week) as the feature freeze time.

Similarly as last time, we want to create RC0 on the day after the
feature freeze, to make sure the RC creation process is running
smoothly, and to have a common testing reference point.

Having said that let us remind after Robert & Dian from the previous
release what it a Feature Freeze means:

*B) What does feature freeze mean?*After the feature freeze, no new
features are allowed to be merged to master. Only bug fixes and
documentation improvements. The release managers will revert new feature
commits after the feature freeze. Rational: The goal of the feature
freeze phase is to improve the system stability by addressing known
bugs. New features tend to introduce new instabilities, which would
prolong the release process. If you need to merge a new feature after
the freeze, please open a discussion on the dev@ list. If there are no
objections by a PMC member within 48 (workday)hours, the feature can be
merged.

2. *Merge PRs from the command line*

In the past releases it was quite frequent around the Feature Freeze
date that we ended up with a broken main branch that either did not
compile or there were failing tests. It was often due to concurrent
merges to the main branch via the "Rebase and merge" button. To overcome
the problem we would like to suggest only ever merging PRs from a
command line. Thank you Stephan for the idea! The suggested workflow
would look as follows:

 1. Pull the change and rebase on the current main branch
 2. Build the project (e.g. from IDE, which should be faster than
building entire project from cmd) -> this should ensure the project
compiles
 3. Run the tests in the module that the change affects -> this should
greatly minimize the chances of failling tests
 4. Push the change to the main branch

Let us know what you think!

Best,

Guowei & Dawid




OpenPGP_signature
Description: OpenPGP digital signature


Re: Eliminating Shuffling Under FlinkSQL

2021-03-19 Thread Dawid Wysakowicz
Your understanding of a group by is correct. It is equivalent to a key
by. I agree it would be a great feature to keep the Source's
partitioning but unfortunately as of now it is not yet supported.

Best,

Dawid

On 18/03/2021 18:28, Aeden Jameson wrote:
> It's my understanding that a group by is also a key by under the hood.
> As a result that will cause a shuffle operation to happen. Our source
> is a Kafka topic that is keyed so that any give partition contains all
> the data that is needed for any given consuming TM. Is there a way
> using FlinkSQL to eliminate the shuffle operation? Or I'm missing
> details other details that would make such a change undesirable?
>
> Thank you,
> Aeden



OpenPGP_signature
Description: OpenPGP digital signature


Re: Understanding Max Parallelism

2021-03-19 Thread Dawid Wysakowicz
Hi Aeden,

The maxParallelism option defines the number of key groups that will be
created within the keyed state and thus define the maximum parallelism
that a Flink keyed job can scale up to as each key group must be
atomically assigned to a single task. You can read more on how the
rescaling works in this blogpost[1].

Following up on your other questions it is mainly a reservation as of
now, but it will definitely be a cap in case of a reactive/auto scaling
because of the above.

Best,

Dawid

[1] https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html

On 18/03/2021 17:40, Aeden Jameson wrote:
> I'm trying to get my head around the impact of setting max parallelism.
>
> * Does max parallelism primarily serve as a reservation for future
> increases to parallelism? The reservation being the ability to restore
> from checkpoints and savepoints after increases to parallelism.
>
> * Does it serve as a runtime suggestion for how many instances of an
> operator the job could spin up? Or is it just a reservation like I
> asked above?
>
> * It also appears to impact the distribution of key groups among
> subtasks from what I've read and seen from testing. Is that
> understanding correct?
>
> * What are the other important implications?
>
>
> Thank you,
> Aeden



OpenPGP_signature
Description: OpenPGP digital signature


Re: Parameter to config read frequency in Kafka SQL connector

2021-03-19 Thread Dawid Wysakowicz
Hi,

Unfortunately I have no experience with this. You can pass any Kafka
client parameters through the properties.* option[1] and see if the
setting works for you.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#properties

On 18/03/2021 12:50, eef hhj wrote:
> Hi team,
>
> We are in a situatoin that we want to reduce the read frequency of
> Kafka SQL connector. I did some investigation on the properties of
> Kafka client, while it seems it does not have such options. Athough I
> found the batch size config('properties.max.partition.fetch.bytes')
> among the config options.
>
> https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
> <https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html>
>
> Any suggestions on this?
>



OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink minimum resource recommendation on k8s cluster

2021-03-19 Thread Dawid Wysakowicz
I'd say no. It depends on your job. You can refer to a very good
presentation from Robert on how to calculate resource requirements[1].

[1] https://www.youtube.com/watch?v=8l8dCKMMWkw

On 18/03/2021 11:37, Amit Bhatia wrote:
> Hi,
>
> Is there any minimum resource ( CPU & Memory) recommendation to start
> flink jobmanager and taskmanager pods on k8s cluster.
>
> Regards,
> Amit Bhatia



OpenPGP_signature
Description: OpenPGP digital signature


Re: The Role of TimerService in ProcessFunction

2021-03-19 Thread Dawid Wysakowicz
Hi Chirag,

I agree it might be a little bit confusing.

Let me try to explain the reasoning. To do that I'll first try to
rephrase the reasoning from FLINK-8560 for introducing the
KeyedProcessFunction. It was introduced so that users have a typed
access to the current key via Context and OnTimerContext. This is
actually the only difference between the two function.

Somewhat as a consequence of the above, the KeyedProcessFunction can be
used solely on a keyed stream, however ProcessFunction can be used in
both. That was actually the only way to use a ProcessFunction on a keyed
stream prior to introducing the KeyedProcessFunction. If you don't need
access to the current key you should be fine with using the
ProcessFunction on a keyed stream and there you can use the
TimerService. It is advised to use a KeyedProcessFunction on a keyed
stream, however for backwards compatibility the old behaviour has been kept.

Hope that it clarifies the things a bit.

Best,

Dawid

On 17/03/2021 07:47, Chirag Dewan wrote:
> Hi,
>
> Currently, both ProcessFunction and KeyedProcessFunction (and their
> CoProcess counterparts) expose the Context and TimerService in the
> processElement() method. However, if we use the TimerService in non
> keyed context, it gives a runtime error. 
>
> I am a bit confused about these APIs. Is there any specific reason for
> exposing TimerService in non-keyed context especially if it cant be
> used without keyed stream?
>
> Any leads are much appreciated.
>
> Thanks,
> Chirag


OpenPGP_signature
Description: OpenPGP digital signature


Re: Production Readiness of File Source

2021-03-18 Thread Dawid Wysakowicz
Hi,

As for the issue of production readiness of the File Source(and other
components) I'd recommend having a look at the PR, which is close to
being merged where we express our opinion how we see certain components:
https://github.com/apache/flink-web/pull/426

I am also cc'ing Stephan who wrote the File Source to grasp his opinion
as well.

Best,

Dawid

On 17/03/2021 06:53, Chirag Dewan wrote:
> Hi,
>
> I am intending to use the File source for a production use case. I
> have a few use cases that are currently not supported like deleting a
> file once it's processed. 
>
> So I was wondering if we can use this in production or write my own
> implementation? Is there any recommendations around this?
>
> Thanks
> Chirag


OpenPGP_signature
Description: OpenPGP digital signature


Re: ClassCastException after upgrading Flink application to 1.11.2

2021-03-18 Thread Dawid Wysakowicz
Could you share a full stacktrace with us? Could you check the stack
trace also in the task managers logs?

As a side note, make sure you are using the same version of all Flink
dependencies.

Best,

Dawid

On 17/03/2021 06:26, soumoks wrote:
> Hi,
>
> We have upgraded an application originally written for Flink 1.9.1 with
> Scala 2.11 to Flink 1.11.2 with Scala 2.12.7 and we are seeing the following
> error at runtime.
>
>
> 2021-03-16 20:37:08
> java.lang.RuntimeException
>   at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
>   at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>   at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
>   at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>   at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>   at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>   at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>   at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>   at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)
> Caused by: java.lang.ClassCastException
>
>
>
> The class in question was using Scala Long and Scala BigDecimal types which
> have been changed to Java Long and Java BigDecimal types as a means to
> resolve this error but to no avail.
>
> This application is running on AWS EMR running emr-6.2.0 if that helps.
>
>
> Thanks,
> Sourabh
>
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



OpenPGP_signature
Description: OpenPGP digital signature


Re: How to advance Kafka offsets manually with checkpointing enabled on Flink (TableAPI)

2021-03-18 Thread Dawid Wysakowicz
Hi Rex,

The approach you described is definitely possible in the DataStream API.
You could replace the uid of your Kafka source and start your job with
your checkpoint with the allowNonRestoredState option enabled[1]. I am
afraid though it is not possible to change the uid in Table API/SQL

Another approach that you could try is to edit the checkpoint via the
State Processor API[2] and increase the checkpointed offsets.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/cli.html#starting-a-job-from-a-savepoint

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/libs/state_processor_api.html

On 16/03/2021 20:03, Rex Fenley wrote:
> Hello,
>
> I'm wondering how, in the event of a poison pill record on Kafka, to
> advance a partition's checkpointed offsets by 1 when using the
> TableAPI/SQL.
>
> It is my understanding that when checkpointing is enabled Flink uses
> its own checkpoint committed offsets and not the offsets committed to
> Kafka when starting a job from a checkpoint.
>
> In the event that there is a poison pill record in Kafka that is
> crashing the Flink job, we may want to simply advance our checkpointed
> offsets by 1 for the partition, past the poison record, and then
> continue operation as normal. We do not want to lose any other state
> in Flink however.
>
> I'm wondering how to go about this then. It's easy enough to have
> Kafka advance its committed offsets. Is there a way to tell Flink to
> ignore checkpointed offsets and instead respect the offsets committed
> to Kafka for a consumer group when restoring from a checkpoint?
> If so we could:
> 1. Advance Kafka's offsets.
> 2. Run our job from the checkpoint and have it use Kafka's offsets and
> then checkpoint with new Kafka offsets.
> 3. Stop the job, and rerun it using Flink's committed, now advanced,
> offsets.
>
> Is this possible? Are there any better strategies?
>
> Thanks!
>
> -- 
>
> Rex Fenley | Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/>|  BLOG
> <http://blog.remind.com/> |  FOLLOW US
> <https://twitter.com/remindhq> |  LIKE US
> <https://www.facebook.com/remindhq>
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: custom metrics within a Trigger

2021-03-18 Thread Dawid Wysakowicz
Do you mind sharing the code how do you register your metrics with the
TriggerContext? It could help us identify where does name collisions
come from. As far as I am aware it should be fine to use the
TriggerContext for registering metrics.

Best,

Dawid

On 16/03/2021 17:35, Aleksander Sumowski wrote:
> Hi all,
> I'd like to measure how many events arrive within allowed lateness
> grouped by particular feature of the event. We assume particular type
> of events have way more late arrivals and would like to verify this.
> The natural place to make the measurement would be our custom trigger
> within onElement method as this is the place where we know whether
> event is late of not. The issue is that the only way to register
> MetricGroup at this moment is via Trigger.TriggerContext - which leads
> to re-registering and lots of logs:
>
>
>   
> `Name collision: Group already contains a Metric with the name XXX.
> Metric will not be reported.`
>
>
> Any hints how to tackle it?
>
> Thanks,
> Aleksander


OpenPGP_signature
Description: OpenPGP digital signature


Re: DataStream in batch mode - handling (un)ordered bounded data

2021-03-12 Thread Dawid Wysakowicz
Hi Alexis,

As of now there is no such feature in the DataStream API. The Batch mode
in DataStream API is a new feature and we would be interested to hear
about the use cases people want to use it for to identify potential
areas to improve. What you are suggesting generally make sense so I
think it would be nice if you could create a jira ticket for it.

Best,

Dawid

On 12/03/2021 15:37, Alexis Sarda-Espinosa wrote:
>
> Hello,
>
>  
>
> Regarding the new BATCH mode of the data stream API, I see that the
> documentation states that some operators will process all data for a
> given key before moving on to the next one. However, I don’t see how
> Flink is supposed to know whether the input will provide all data for
> a given key sequentially. In the DataSet API, an (undocumented?)
> feature is using SplitDataProperties
> (https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/SplitDataProperties.html>)
> to specify different grouping/partitioning/sorting properties, so if
> the data is pre-sorted (e.g. when reading from a database), some
> operations can be optimized. Will the DataStream API get something
> similar?
>
>  
>
> Regards,
>
> Alexis.
>
>  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Gradually increasing checkpoint size

2021-03-11 Thread Dawid Wysakowicz
Hey Dan,

I think the logic should be correct. Mind that in the processElement we
are using *relative*Upper/LowerBound, which are inverted global bound:

relativeUpperBound = upperBound for left and -lowerBound for right

relativeLowerBound = lowerBound for left and -upperBound for right

Therefore the cleaning logic in onTimer effectively uses the same logic.
If I understand it correctly, this trick was introduced to deduplicate
the method.

There might be a bug somewhere, but I don't think it's where you
pointed. I'd suggest to first investigate the progress of watermarks.

Best,

Dawid

On 09/03/2021 08:36, Dan Hill wrote:
> Hi Yun!
>
> That advice was useful.  The state for that operator is very small
> (31kb).  Most of the checkpoint size is in a couple simple
> DataStream.intervalJoin operators.  The time intervals are fairly short.
>
> I'm going to try running the code with some small configuration
> changes.  One thing I did notice is that I set a positive value for
> the relativeUpperBound.  I'm not sure if I found a bug in
> IntervalJoinOperator
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java>.
>  
> The logic in IntervalJoinOperator.onEventTime needs an exact timestamp
> for clean up.  It has some logic around cleaning up the right side
> that uses timerTimestamp + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L310>.
>  
> However, processElement doesn’t use the same logic when creating a
> timer (I only see + lowerBound
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L253>).
>  
> Maybe I'm misreading the code.  It feels like a bug.
>
>
> On Mon, Mar 8, 2021 at 10:29 PM Yun Gao  <mailto:yungao...@aliyun.com>> wrote:
>
> Hi Dan,
>
> Regarding the original checkpoint size problem, could you also
> have a check 
> which tasks' state are increasing from the checkpoint UI ? For
> example, the 
> attached operator has a `alreadyOutputed` value state, which seems
> to keep
> increasing if there are always new keys ?
>
> Best,
> Yun
>
>
> --Original Mail --
> *Sender:*Dan Hill  <mailto:quietgol...@gmail.com>>
> *Send Date:*Tue Mar 9 00:59:24 2021
> *Recipients:*Yun Gao  <mailto:yungao...@aliyun.com>>
> *CC:*user mailto:user@flink.apache.org>>
> *Subject:*Re: Gradually increasing checkpoint size
>
> Hi Yun!
>
> Thanks for the quick reply.
>
> One of the lowerBounds is large but the table being joined
> with is ~500 rows.  I also have my own operator that only
> outputs the first value.
>
> public class OnlyFirstUser
> extends RichFlatMapFunction {
>
>
>     private transient ValueState alreadyOutputted;
>
>
>     @Override
>
>     public void flatMap(T value, Collector out) throws
> Exception {
>
>         if (!alreadyOutputted.value()) {
>
>             alreadyOutputted.update(true);
>
>             out.collect(value);
>
>         }
>
>     }
>
>
>     @Override
>
>     public void open(Configuration config) {
>
>         ValueStateDescriptor descriptor =
>
>                 new ValueStateDescriptor<>(
>
>                         "alreadyOutputted", // the state name
>
>                         TypeInformation.of(new
> TypeHint() {}), // type information
>
>                         false); // default value of the
> state, if nothing was set
>
>         alreadyOutputted =
> getRuntimeContext().getState(descriptor);
>
>     }
>
> }
>
>
> All of my inputs have this watermark strategy.  In the
> Flink UI, early in the job run, I see "Low Watermarks" on
> each node and they increase.  After some checkpoint
> failures, low watermarks stop appearing in the UI
> 
> <https://drive.google.com/file/d/1fLnT3068g3ddlMhfMH5j__kb-gMvmVXm/view?usp=sharing>.
>
>
> .assignTimestampsAndWatermarks(
>
>    
> 

Re: Flink + Hive + Compaction + Parquet?

2021-03-04 Thread Dawid Wysakowicz
Hi,

I know Jingsong worked on Flink/Hive filesystem integration in the
Table/SQL API. Maybe he can shed some light on your questions.

Best,

Dawid

On 02/03/2021 21:03, Theo Diefenthal wrote:
> Hi there,
>
> Currently, I have a Flink 1.11 job which writes parquet files via the
> StreamingFileSink to HDFS (simply using DataStream API). I commit like
> every 3 minutes and thus have many small files in HDFS. Downstream,
> the generated table is consumed from Spark Jobs and Impala queries.
> HDFS doesn't like to have too many small files and writing to parquet
> fast but also desiring large files is a rather common problem and
> solutions were suggested like recently in the mailing list [1] or in
> flink forward talks [2]. Cloudera also posted two possible scenarios
> in their blog posts [3], [4]. Mostly, it comes down to asynchronously
> compact the many small files into larger ones, at best non blocking
> and in an occasionally running batch job.
>
> I am now about to implement something like suggested in the cloudera
> blog [4] but from parquet to parquet. For me, it seems to be not
> straight forward but rather involved, especially as my data is
> partitioned in eventtime and I need the compaction to be non blocking
> (my users query impala and expect near real time performance in each
> query). When starting the work on that, I noticed that Hive already
> has a compaction mechanism included and the Flink community works a
> lot in terms of integrating with hive in the latest releases. Some of
> my questions are not directly related to Flink, but I guess many of
> you have also experience with hive and writing from Flink to Hive is
> rather common nowadays.
>
> I read online that Spark should integrate nicely with Hive tables,
> i.e. instead of querying HDFS files, querying a hive table has the
> same performance [5]. We also all know that Impala integrates nicely
> with Hive so that overall, I can expect writing to Hive internal
> tables instead of HDFS parquet doesn't have any disadvantages for me.
>
> My questions:
> 1. Can I use Flink to "streaming write" to Hive?
> 2. For compaction, I need "transactional tables" and according to the
> hive docs, transactional tables must be fully managed by hive (i.e.,
> they are not external). Does Flink support writing to those out of the
> box? (I only have Hive 2 available)
> 3. Does Flink use the "Hive Streaming Data Ingest" APIs?
> 4. Do you see any downsides in writing to hive compared to writing to
> parquet directly? (Especially in my usecase only having impala and
> spark consumers)
> 5. Not Flink related: Have you ever experienced performance issues
> when using hive transactional tables over writing parquet directly? I
> guess there must be a reason why "transactional" is off by default in
> Hive? I won't use any features except for compaction, i.e. there are
> only streaming inserts, no updates, no deletes. (Delete only after
> given retention and always delete entire partitions)
>
>
> Best regards
> Theo
>
> [1]
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-data-to-parquet-td38029.html
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Streaming-data-to-parquet-td38029.html>
> [2] https://www.youtube.com/watch?v=eOQ2073iWt4
> <https://www.youtube.com/watch?v=eOQ2073iWt4>
> [3]
> https://blog.cloudera.com/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
> <https://blog.cloudera.com/how-to-ingest-and-query-fast-data-with-impala-without-kudu/>
>
> [4]
> https://blog.cloudera.com/transparent-hierarchical-storage-management-with-apache-kudu-and-impala/
> <https://blog.cloudera.com/transparent-hierarchical-storage-management-with-apache-kudu-and-impala/>
>
> [5]
> https://stackoverflow.com/questions/51190646/spark-dataset-on-hive-vs-parquet-file
> <https://stackoverflow.com/questions/51190646/spark-dataset-on-hive-vs-parquet-file>


OpenPGP_signature
Description: OpenPGP digital signature


Re: State Schema Evolution within SQL API

2021-03-04 Thread Dawid Wysakowicz
Hi Jan,

As of now Flink does not give any guarantees for Table/SQL API savepoint
compatibility if you change the query or Flink version. Flink Table/SQL
API uses an optimizer that can apply different optimizations or
operations reordering based on the queried fields or computations that
can result in a completely different physical plan.

Generally speaking you should be fine when adding/removing fields in a
projection. I'd say it is the only somewhat safe change, but it is not
guaranteed in all cases nevertheless.

Best,

Dawid

On 01/03/2021 17:41, Jan Oelschlegel wrote:
>
> Hi at all,
>
>  
>
> i would like to know how far a state schema evolution is possible by
> using SQL API of Flink.  Which query changes can I do without
> disrupting the schema of my savepoint?
>
>  
>
>  
>
> In the documentation is, only for the DataStream API , written what
> are the do’s and don’ts regarding a safe schema evolution. [1]
>
>  
>
>  
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html>
>
>  
>
>  
>
>  
>
> Best,
>
> Jan
>
>  
>
> HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den
> Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren
> oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht
> irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail
> oder unter der oben angegebenen Telefonnummer. 


OpenPGP_signature
Description: OpenPGP digital signature


Re: timeWindow()s and queryable state

2021-03-04 Thread Dawid Wysakowicz
Hey Ron,

I am pretty sure the queryable state will not do any pruning. It will
keep the state for all windows seen so far. The allowedLateness applies
to the window computation not the queryable state part. The
`asQueryableState` will create a downstream operator that will keep
updating a state with results of the window operator.

If you want to have a more fine grained control over what and how long
is kept in a queryable state you can write your own process function
with a state that you configure to be queryable via[1]:

ValueStateDescriptor desc = new ValueStateDescriptor<>("any",
IntSerializer.INSTANCE);
desc.setQueryable("vanilla");

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html#managed-keyed-state

On 01/03/2021 17:39, Ron Crocker wrote:
> Hi all -
>
> I’m trying to keep some state around for a little while after a window
> fires to use as queryable state. I am intending on using something like:
>
> .keyBy()
> .timeWindow(Time.minutes(1)).allowedLateness(Time.minutes(90))
> .aggregate(…)
> .keyBy()
> .asQueryableState(...)
>
> My intent is to keep that window available for 90 minutes. I’m not
> sure how I feel about this pattern - it feels more side-effect-y than
> intentional.
>
> My questions:
> a) Is that actually going to keep the window (and, by implication, the
> downstream state) around?
> b) Is there a “more correct” way to do this? Maybe it would be better
> to use some kind of time-aware reducing state that will provide some
> lingering state?
>
> Before you ask, no, I haven’t run it to see what it does. That’s next,
> but I figured I’d ask for your advice first


OpenPGP_signature
Description: OpenPGP digital signature


Re: Producer Configuration

2021-03-04 Thread Dawid Wysakowicz
Hey Claude.

Alexey is right about the page. The page from your screenshot shows only
the entries passed via
StreamExecutionEnvironment#getConfig#setGlobalJobParameters.
Configuration for individual connectors or other operators is not
displayed there.

If you need help debugging your time out issue could you post your
configuration and the stacktrace you are getting?

Best,

Dawid

On 28/02/2021 03:14, Alexey Trenikhun wrote:
> They are picked up, otherwise you would not able to write any messages
> at all. I believe the page you referring is not for displaying Kafka
> properties (in my case it is empty as well, but Kafka works). Check logs.
>
> 
> *From:* Claude M 
> *Sent:* Saturday, February 27, 2021 4:00:23 PM
> *To:* Alexey Trenikhun 
> *Cc:* user 
> *Subject:* Re: Producer Configuration
>  
> Yes, the flink job also works in producing messages.  It's just that
> after a short period of time, it fails w/ a timeout.  That is why I'm
> trying to set a longer timeout period but it doesn't seem like the
> properties are being picked up. 
>
> On Sat, Feb 27, 2021 at 1:17 PM Alexey Trenikhun  <mailto:yen...@msn.com>> wrote:
>
> Can you produce messages using Kafka console producer connect
> using same properties ?
>
> 
> *From:* Claude M  <mailto:claudemur...@gmail.com>>
> *Sent:* Saturday, February 27, 2021 8:05 AM
> *To:* Alexey Trenikhun mailto:yen...@msn.com>>
> *Cc:* user mailto:user@flink.apache.org>>
> *Subject:* Re: Producer Configuration
>  
> Thanks for your reply, yes it was specified.  Sorry I forgot to
> include it:
>  properties.setProperty("bootstrap.servers", "localhost:9092");
>
> On Fri, Feb 26, 2021 at 7:56 PM Alexey Trenikhun  <mailto:yen...@msn.com>> wrote:
>
> I believe bootstrap.servers is mandatory Kafka property, but
> it looks like you didn’t set it
>
> 
> 
> *From:* Claude M  <mailto:claudemur...@gmail.com>>
> *Sent:* Friday, February 26, 2021 12:02:10 PM
> *To:* user mailto:user@flink.apache.org>>
> *Subject:* Producer Configuration
>  
> Hello,
>  
> I created a simple Producer and when the job ran, it was
> getting the following error: 
> Caused by: org.apache.kafka.common.errors.TimeoutException
>
> I read about increasing the request.timeout.ms
> <http://request.timeout.ms>.   Thus, I added the following
> properties.  
>
> Properties properties = new Properties();
> properties.setProperty("request.timeout.ms
> <http://request.timeout.ms>", "3");
> properties.setProperty("retries", "20");
> DataStream stream = env.addSource(new
> SimpleStringGenerator());
> stream.addSink(new FlinkKafkaProducer<>("flink-test", new
> SimpleStringSchema(), properties));
>
> However, after the job is submitted, the User Configuration is
> empty, please see attached.  
> Therefore, it seems these properties are taking into effect
> since I still have the same problem.  
> Any help on these issues are appreciated, thanks.  
>


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink CEP: can't process PatternStream (v 1.12, EventTime mode)

2021-02-26 Thread Dawid Wysakowicz
Hi,

What is exactly the problem? Is it that no patterns are being generated?

Usually the problem is in idle parallel instances[1]. You need to have
data flowing in each of the parallel instances for a watermark to
progress. You can also read about it in the aspect of Kafka's partitions[2].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#dealing-with-idle-sources

[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#watermark-strategies-and-the-kafka-connector

On 26/02/2021 13:21, Люльченко Юрий Николаевич wrote:
> Hello,
>  
> I’ve already asked the question today and got the
> solve: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-CEP-can-t-process-PatternStream-td41722.html>,
>  and
> it’s clean for me how PatternStream works with ProcessTime.
>  
> But I need help again, I can’t write proper code to execute
> PatternStream with EventTime regime.
> I think the problem is how I assign the watermark strategy.
>  
> My code is below, version of Flink is 1.12:
>  
>
> public class Main {
>
>  
>
>     public static void main(String[] args) throws Exception {
>
>  
>
>     Properties properties = new Properties();
>
>     properties.put("group.id", "Flink");
>
>     properties.put("bootstrap.servers", "broker:9092");
>
>  
>
>  
>
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>  
>
>     FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(
>
>     "test",
>
>     new SimpleStringSchema(),
>
>     properties);
>
>  
>
>     DataStream stream = env
>
>     .addSource(consumer)
>
>     .map((MapFunction) s -> {
>
>                     // /Just getting an object model/
>
>     return model.toString();
>
>    
> }).*assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))*
>
> *    .withTimestampAssigner((event, timestamp) -> {*
>
> *    Model model = new Gson().fromJson(event,
> Model.class);*
>
> *    return model.getServerTime();*
>
>   *  }));*
>
>  
>
>     stream.print("Stream");
>
>  
>
>  
>
>  
>
>     Pattern firstPattern = Pattern
>
>     .begin("first")
>
>     .where(new IterativeCondition() {
>
>     @Override
>
>     public boolean filter(String s, Context
> context) throws Exception {
>
>     return s.contains("Start");
>
>     }
>
>     });
>
>  
>
>     DataStream result = CEP
>
>     .pattern(stream, firstPattern)
>
>     *.inEventTime() // default TimeCharacteristic for 1.12*
>
>     .process(new PatternProcessFunction() {
>
>     @Override
>
>     public void processMatch(Map>
> map, Context context, Collector collector) throws Exception {
>
>     collector.collect(map.get("first").get(0));
>
>     }
>
>     });
>
>  
>
>     result.print("Result");
>
>  
>
>     env.execute();
>
> }
>
>  
>
> }
>
>  
> Please, help me to correct the code )
>  
> Thanks,Yuri L.
>  Ответить
>  Переслать
>  Предложить звонок
>  Создать событие
> ПринятоХорошоВсе понятно, спасибо за информацию
>
>  
>  
>  


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink CEP: can't process PatternStream

2021-02-26 Thread Dawid Wysakowicz
Hi Yuri,

Which Flink version are you using? Is it 1.12? In 1.12 we changed the
default TimeCharacteristic to EventTime. Therefore you need watermarks
and timestamp[1] for your program to work correctly. If you want to
apply your pattern in ProcessingTime you can do:

PatternStream patternStream = CEP.pattern(stream,
pattern).inProcessingTime();

Basically you are facing exactly the same problem as described in the
stackoverflow entry you posted.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/event_timestamps_watermarks.html#generating-watermarks

On 26/02/2021 09:18, Люльченко Юрий Николаевич wrote:
> Hello everyone.
>  
> I’m trying to use Flink Cep library and I want to fetch some events by
> pattern. At first I’ve created a simple HelloWorld project. But I have
> a problem exactly like it described
> here: 
> https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed
> <https://stackoverflow.com/questions/39575991/flink-cep-no-results-printed>
>  
> You can see my code at this
> page: https://gist.github.com/Maminspapin/07615706f4ce975eb3cf5f0b407b0644
> <https://gist.github.com/Maminspapin/07615706f4ce975eb3cf5f0b407b0644>
>  
> No actions are heppend at this block:
>  
> *        DataStream alerts = patternStream
>                 .process(new PatternProcessFunction() {
>                     @Override
>                     public void processMatch(Map>
> map, Context context, Collector collector)
>                             throws Exception {
>                        
>                         String first = map.get("first").get(0);
>                         System.out.println("First: " + first);
>                     }
>                 });*
> *        alerts.print();*
>  
> Can someone help me understand the cause?
>  
> Thanks,
> Yuri L.
>  


OpenPGP_signature
Description: OpenPGP digital signature


[UPDATE] Release 1.13 feature freeze

2021-02-24 Thread Dawid Wysakowicz
Hi all,

The agreed date of a feature freeze is due in about a month. Therefore
we thought it would be a good time to give an update of the current
progress.

From the information we gathered there are currently no known obstacles
or foreseeable delays. We are still aiming for the end of March as the
date for a feature freeze.

Some update on the progress of particular issues. Most of the issues
listed in the https://cwiki.apache.org/confluence/x/jAhRCg are well on
track. We will list only the issues that we feel might not end up in the
release or end up there partially:

  * Tolerate temporarily suspended ZooKeeper connections (FLINK-10052)
  * Support shipping local/remote files for yarn/k8s
integrations(FLINK-20681 / FLINK-20811 / FLINK-20867)
  * Consistent Flink SQL time function behavior (FLIP-162)

These are long lasting efforts and are planned to be delivered based on
best effort, therefore they might not be fully functional in 1.13:

  * Generalized incremental checkpoints
  * Incremental snapshots for heap-based state backend

We have 4 release blockers as of today:

Checkpointing:

  * https://issues.apache.org/jira/browse/FLINK-21453
BoundedOneInput.endInput is NOT called when doing stop with
savepoint WITH drain

Dependencies:

  * https://issues.apache.org/jira/browse/FLINK-21152 Bump flink-shaded
to 13.0

Coordination:

  * https://issues.apache.org/jira/browse/FLINK-21030 Broken job restart
for job with disjoint graph

Connectors:

  * https://issues.apache.org/jira/browse/FLINK-20188 Add Documentation
for new File Source

We have 100 test instabilities across all versions out of which 37
occurred on master (1.13) branch.

Let me know if we made a mistake somewhere or something is inaccurate.
Moreover everyone is welcome to give more thorough update if you feel
like so.

Your release managers

Guowei & Dawid




OpenPGP_signature
Description: OpenPGP digital signature


Re: 回复: DataStream problem

2021-02-17 Thread Dawid Wysakowicz
I am sure you can achieve that with a ProcessFunction[1]

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/process_function.html#process-function

On 16/02/2021 07:28, ?g???U?[ wrote:
> Hi Dawid
>
> ?0?2 ?0?2 For example, if user 001 takes an order and generates an order
> message 1, I need to monitor if user 001 takes another order and
> generates a new order message 2 within 10 minutes. If user 001
> produces an order message 2 again within 10 minutes, I need to mark
> true in the 2 message and output it
>
>
> --?0?2?0?2------
> *??:* "Dawid Wysakowicz" ;
> *:*?0?22021??2??15??(??) 6:59
> *??:*?0?2"?g???U?[";"user";
> *:*?0?2Re: DataStream problem
>
> Hi Jiazhi,
>
> Could you elaborate what exactly do you want to achieve? What have you
> tried so far?
>
> Best,
>
> Dawid
>
> On 15/02/2021 11:11, ?g???U?[ wrote:
> > Hi all
> > ?0?2 ?0?2 ?0?2 ?0?2Using DataStream, How to implement a message and the same
> > message appears again 10 minutes later?
> > Thanks,
> > Jiazhi
>
>


signature.asc
Description: OpenPGP digital signature


Re: Failed to register Protobuf Kryo serialization

2021-02-15 Thread Dawid Wysakowicz
Improvements to the documentation are always welcome.

In this particular case we actually need to be really careful, as it is
not always the expected behavior. As you are registering your own kryo
serializer it is expected in your case.

However more often the case is, you don't want to use the GenericType,
but a PojoType and this message helps you to identify a problem with
your POJO declaration.

Best,

Dawid

On 15/02/2021 11:50, Svend Vanderveken wrote:
> Oh!
>
> Indeed, my program was just not starting because I omitted the
> flink.execute() part ! I confirms it works now. 
>
> Thanks for the quick response.
>
> Do you mind if I submit a small PR to the Flink doc to clarify that
> those INFO logs are indeed the expected behavior? For example
> here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> Svend
>
>
>
> On Mon, Feb 15, 2021 at 10:03 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hey,
>
> Why do you say the way you did it, does not work? The logs you
> posted say the classes cannot be handled by Flink's built-in
> mechanism for serializing POJOs and it falls back to a GenericType
> which is serialized with Kryo and should go through your
> registered serializer.
>
> Best,
>
> Dawid
>
>
> On 14/02/2021 11:44, Svend Vanderveken wrote:
>>
>>
>> Hi all,
>>
>> I'm failing to setup an example of wire serialization with
>> Protobuf, could you help me figure out what I'm doing wrong?
>>
>> I'm using a simple protobuf schema:
>> ```
>> syntax = "proto3";
>>
>> import "google/protobuf/wrappers.proto";
>> option java_multiple_files = true;
>> message DemoUserEvent {
>>   Metadata metadata = 1;
>>   oneof payload {
>> Created created = 10;
>> Updated updated = 11;
>>   }
>>   message Created {...}
>> message Updated{...}
>>   ...
>> }
>> ```
>>
>> From which I'm generating java from this Gradle plugin:
>>
>>
>> ```
>> plugins { id "com.google.protobuf" version "0.8.15"}
>> ```
>> And I'm generating DemoUserEvent instances with Java Iterator
>> looking like this:
>> ```
>> public class UserEventGenerator implements Iterator, 
>> Serializable {
>> transient public final static Faker faker = new Faker();
>> ...
>> @Overridepublic DemoUserEvent next() {
>> return randomCreatedEvent();
>>  }
>>  ...
>> ```
>>
>> I read those two pieces of documentation:
>> * 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
>> * 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>>
>> And tried the demo app below:
>>
>> ```
>> import com.twitter.chill.protobuf.ProtobufSerializer;
>> ...
>> public static void main(String[] args) {
>> final StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, 
>> ProtobufSerializer.class);
>> flinkEnv.fromCollection(new UserEventGenerator(), 
>> DemoUserEvent.class).print();
>> }
>> ```
>> But the serialization mechanism still fails to handle my protobuf class:
>> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
>> getter for field payloadCase_
>> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
>> setter for field payloadCase_
>> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor     
>>        [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be 
>> used as a POJO type because not all fields are valid POJO fields, and must 
>> be processed as GenericType. Please read the Flink documentation on "Data 
>> Types & Serialization" for details of the effect on performance.
>>
>> I've also tried this, without success:
>>
>> ```
>> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, 
>> ProtobufSerializer.class);
>> 

Re: DataStream problem

2021-02-15 Thread Dawid Wysakowicz
Hi Jiazhi,

Could you elaborate what exactly do you want to achieve? What have you
tried so far?

Best,

Dawid

On 15/02/2021 11:11, ?g???U?[ wrote:
> Hi all
> ?0?2 ?0?2 ?0?2 ?0?2Using DataStream, How to implement a message and the same
> message appears again 10 minutes later?
> Thanks,
> Jiazhi



signature.asc
Description: OpenPGP digital signature


Re: Performance issues when RocksDB block cache is full

2021-02-15 Thread Dawid Wysakowicz
Hey Yaroslav,

Unfortunately I don't have enough knowledge to give you an educated
reply. The first part certainly does make sense to me, but I am not sure
how to mitigate the issue. I am ccing Yun Tang who worked more on the
RocksDB state backend (It might take him a while to answer though, as he
is on vacation right now).

Best,

Dawid

On 14/02/2021 06:57, Yaroslav Tkachenko wrote:
> Hello,
>
> I observe throughput degradation when my pipeline reaches the maximum
> of the allocated block cache. 
>
> The pipeline is consuming from a few Kafka topics at a high rate
> (100k+ rec/s). Almost every processed message results in a (keyed)
> state read with an optional write. I've enabled native RocksDB metrics
> and noticed that everything stays stable until the block cache usage
> reaches maximum. If I understand correctly, this makes sense: this
> cache is used for all reads and cache misses could mean reading data
> on disk, which is much slower (I haven't switched to SSDs yet). Does
> it make sense? 
>
> One thing I know about the messages I consume: I expect very few keys
> to be active simultaneously, most of them can be treated as cold. So
> I'd love RocksDB block cache to have a TTL option (say, 30 minutes),
> which, I imagine, could solve this issue by guaranteeing to only keep
> active keys in memory. I don't feel like LRU is doing a very good job
> here... I couldn't find any option like that, but I'm wondering if
> someone could recommend something similar.
>
> Thank you!
>
> -- 
> Yaroslav Tkachenko
> sap1ens.com <https://sap1ens.com>


signature.asc
Description: OpenPGP digital signature


Re: Failed to register Protobuf Kryo serialization

2021-02-15 Thread Dawid Wysakowicz
Hey,

Why do you say the way you did it, does not work? The logs you posted
say the classes cannot be handled by Flink's built-in mechanism for
serializing POJOs and it falls back to a GenericType which is serialized
with Kryo and should go through your registered serializer.

Best,

Dawid


On 14/02/2021 11:44, Svend Vanderveken wrote:
>
>
> Hi all,
>
> I'm failing to setup an example of wire serialization with Protobuf,
> could you help me figure out what I'm doing wrong?
>
> I'm using a simple protobuf schema:
> ```
> syntax = "proto3";
>
> import "google/protobuf/wrappers.proto";
> option java_multiple_files = true;
> message DemoUserEvent {
>   Metadata metadata = 1;
>   oneof payload {
> Created created = 10;
> Updated updated = 11;
>   }
>   message Created {...}
> message Updated{...}
>   ...
> }
> ```
>
> From which I'm generating java from this Gradle plugin:
>
>
> ```
> plugins { id "com.google.protobuf" version "0.8.15"}
> ```
> And I'm generating DemoUserEvent instances with Java Iterator looking
> like this:
> ```
> public class UserEventGenerator implements Iterator, 
> Serializable {
> transient public final static Faker faker = new Faker();
> ...
> @Overridepublic DemoUserEvent next() {
> return randomCreatedEvent();
>  }
>  ...
> ```
>
> I read those two pieces of documentation:
> * 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html
> * 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
>
> And tried the demo app below:
>
> ```
> import com.twitter.chill.protobuf.ProtobufSerializer;
> ...
> public static void main(String[] args) {
> final StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.getConfig().registerTypeWithKryoSerializer(DemoUserEvent.class, 
> ProtobufSerializer.class);
> flinkEnv.fromCollection(new UserEventGenerator(), 
> DemoUserEvent.class).print();
> }
> ```
> But the serialization mechanism still fails to handle my protobuf class:
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
> getter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - class live.schema.event.user.v1.DemoUserEvent does not contain a 
> setter for field payloadCase_
> 11:22:45,822 INFO  org.apache.flink.api.java.typeutils.TypeExtractor          
>   [] - Class class live.schema.event.user.v1.DemoUserEvent cannot be used as 
> a POJO type because not all fields are valid POJO fields, and must be 
> processed as GenericType. Please read the Flink documentation on "Data Types 
> & Serialization" for details of the effect on performance.
>
> I've also tried this, without success:
>
> ```
> flinkEnv.getConfig().addDefaultKryoSerializer(DemoUserEvent.class, 
> ProtobufSerializer.class);
> ```
>
> I'm using those versions:
>
> ```
> ext { javaVersion = '11' flinkVersion = '1.12.1' scalaBinaryVersion = '2.12' }
> dependencies { compileOnly 
> "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"implementation
>  ("com.twitter:chill-protobuf:0.9.5") { exclude group: 
> 'com.esotericsoftware.kryo', module: 'kryo' } implementation 
> "com.google.protobuf:protobuf-java:3.14.0"implementation 
> 'com.github.javafaker:javafaker:1.0.2'}
> ```
>
> Any idea what I should try next?
>
> Thanks in advance!


signature.asc
Description: OpenPGP digital signature


Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

2021-02-14 Thread Dawid Wysakowicz
The best I can do is point you to the thread[1].

I am also cc'ing Yuan who is the release manager for 1.12.2.

Best,

Dawid

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-12-2-td48603.html

On 15/02/2021 08:51, Yuval Itzchakov wrote:
> Hi Dawid,
> Yes, looks like it. Thanks!
>
> Is there an ETA on 1.12.2 yet?
>
> On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz
> mailto:dwysakow...@apache.org>> wrote:
>
> Hey Yuval,
>
> Could it be that you are hitting this bug[1], which has been fixed
> recently?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-21013
>
> On 15/02/2021 08:20, Yuval Itzchakov wrote:
>> Hi,
>>
>> I have a source that generates events with timestamps. These flow
>> nicely, until encountering a conversion from Table ->
>> DataStream[Row]:
>>
>>     def toRowRetractStream(implicit ev: TypeInformation[Row]):
>> DataStream[Row] =
>>       table
>>         .toRetractStream[Row]
>>         .flatMap { (row, collector: Collector[Row]) =>
>>           if (row._1)
>>             collector.collect(row._2)
>>         }
>>
>> The transformation causes a SinkConversion to be generated with
>> the following code:
>>
>>         @Override
>>         public void
>> 
>> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> element) throws Exception {
>>           org.apache.flink.table.data.RowData in1 =
>> (org.apache.flink.table.data.RowData) element.getValue();
>>            
>>           Object[] fields$12 = new Object[2];
>>           fields$12[0] =
>> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>>           fields$12[1] = (org.apache.flink.types.Row)
>> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>>           scala.Tuple2 result$10 = (scala.Tuple2)
>> serializer$11.createInstance(fields$12);
>>           output.collect(outElement.replace(result$10));         
>>         }
>>
>> The code receives an element of type StreamRecord, which does
>> have a timestamp attached to it, but fails to forward it to the
>> new element (outElement) which is initialized as:
>>
>>         private final
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> outElement = new
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>>
>> Am I missing anything in the Table -> DataStream[Row] conversion
>> that should make the timestamp follow through? or is this a bug?
>>
>> -- 
>> Best Regards,
>> Yuval Itzchakov.
>
>
>
> -- 
> Best Regards,
> Yuval Itzchakov.


signature.asc
Description: OpenPGP digital signature


Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

2021-02-14 Thread Dawid Wysakowicz
Hey Yuval,

Could it be that you are hitting this bug[1], which has been fixed recently?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-21013

On 15/02/2021 08:20, Yuval Itzchakov wrote:
> Hi,
>
> I have a source that generates events with timestamps. These flow
> nicely, until encountering a conversion from Table -> DataStream[Row]:
>
>     def toRowRetractStream(implicit ev: TypeInformation[Row]):
> DataStream[Row] =
>       table
>         .toRetractStream[Row]
>         .flatMap { (row, collector: Collector[Row]) =>
>           if (row._1)
>             collector.collect(row._2)
>         }
>
> The transformation causes a SinkConversion to be generated with the
> following code:
>
>         @Override
>         public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
>           org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
>            
>           Object[] fields$12 = new Object[2];
>           fields$12[0] =
> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>           fields$12[1] = (org.apache.flink.types.Row)
> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>           scala.Tuple2 result$10 = (scala.Tuple2)
> serializer$11.createInstance(fields$12);
>           output.collect(outElement.replace(result$10));         
>         }
>
> The code receives an element of type StreamRecord, which does have a
> timestamp attached to it, but fails to forward it to the new element
> (outElement) which is initialized as:
>
>         private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>
> Am I missing anything in the Table -> DataStream[Row] conversion that
> should make the timestamp follow through? or is this a bug?
>
> -- 
> Best Regards,
> Yuval Itzchakov.


signature.asc
Description: OpenPGP digital signature


  1   2   3   4   5   >