Re: Kubernetes HA - Reusing storage dir for different clusters

2021-10-08 Thread Yang Wang
Yes, if you delete the deployment directly, all the HA data will be
retained. And you could recover the Flink job by creating a new deployment.

You could also find this description in the documentation[1].


[1].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up

Best,
Yang

Alexis Sarda-Espinosa  于2021年10月8日周五
下午10:47写道:

> Hi Yang,
>
> thanks for the confirmation. If I manually stop the job by deleting the
> Kubernetes deployment before it completes, I suppose the files will not be
> cleaned up, right? That's a somewhat non-standard scenario, so I wouldn't
> expect Flink to clean up, I just want to be sure.
>
> Regards,
> Alexis.
>
> --
> *From:* Yang Wang 
> *Sent:* Friday, October 8, 2021 5:24 AM
> *To:* Alexis Sarda-Espinosa 
> *Cc:* Flink ML 
> *Subject:* Re: Kubernetes HA - Reusing storage dir for different clusters
>
> When the Flink job reached to global terminal state(FAILED, CANCELED,
> FINISHED), all the HA related data(including pointers in ConfigMap and
> concrete data in DFS) will be cleaned up automatically.
>
> Best,
> Yang
>
> Alexis Sarda-Espinosa 
> 于2021年10月4日周一 下午3:59写道:
>
> Hello,
>
>
>
> If I deploy a Flink-Kubernetes application with HA, I need to set
> high-availability.storageDir. If my application is a batch job that may run
> multiple times with the same configuration, do I need to manually clean up
> the storage dir between each execution?
>
>
>
> Regards,
>
> Alexis.
>
>
>
>


how to view doc of flink-1.10 in Chinese

2021-10-08 Thread 杨浩
Our company use release-1.10,can we see the zh doc?


English Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
Chinese Doc(only view 
latest):https://flink.apache.org/zh/flink-architecture.html

Re: OVER IGNORE NULLS support

2021-10-08 Thread Caizhi Weng
Hi!

Currently all built-in aggregate functions ignore null input values, so I
guess this is the reason why Flink didn't support this syntax.

I'm sort of curious about this syntax. Does it come from the SQL standard?
What's the opposite of IGNORE NULLS? Is there a NOT IGNORE NULLS and if the
user specifies this an exception will be thrown when a null value is
encountered?

Adrian Bednarz  于2021年10月8日周五 下午9:22写道:

> Hi,
>
> we've been trying to run a query similar to
> SELECT id, type, LAG(id) IGNORE NULLS OVER (PARTITION BY type ORDER BY ts)
> AS lastId
>   FROM Events
>
> A query without IGNORE NULLS clause executes just fine. This syntax is
> supported by Calcite and our clients expect it to work. Our platform uses
> FlinkSQL to execute certain types of queries and currently such syntax
> causes jobs to fail with NPE. Here's a stack trace
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. null
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
> at com.example.OverIgnoreNullsJob.main(OverIgnoreNullsJob.java:37)
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:221)
> at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:67)
> at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:530)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
> at org.apache.calcite.sql.type.SqlTypeUtil.deriveType(SqlTypeUtil.java:178)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:71)
> at
> org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:122)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at
> org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
> at
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
> at
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.

Re: Flink production configuration

2021-10-08 Thread Caizhi Weng
Hi!

The number of task slots needed is the total number of parallelisms of all
the jobs running at the same time (if all the jobs are streaming jobs). The
number of task managers is the total number of task slots divide the number
of task slots per task manager (taskmanager.numberOfTaskSlots). So it
really depends on the jobs you're running.

If you're struggling with determining the number of parallelisms a job
needs, try to start with a relatively small number like 8 or 16 and see if
that meets your needs, if not then double it and try again. There are also
many optimizations which helps you increase the performance if you're using
the Table / SQL API, see
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/

Sigalit Eliazov  于2021年10月8日周五 下午9:44写道:

>
> Hello
> New to flink and I am struggling with defining the correct configuration.
> Currently we decided we have only one job manger.
> We have 5 different pipelines.
> We are not sure how many task managers needs to be defined, how many slots.
>
> Can you please send reference or is there any calculator that assist in
> such decsions?
>
> Thanks
>
>
> -- Forwarded message -
> מאת: Sigalit B 
> ‪Date: יום ה׳, 7 באוק׳ 2021, 12:40‬
> Subject: Flink production configuration
> To: 
>
>
> Hello
> New to flink and I am struggling with defining the correct configuration.
> Currently we decided we have only one job manger.
> We have 5 different pipelines.
> We are not sure how many task managers needs to be defined, how many slots.
>
> Can you please send reference or is there any calculator that assist in
> such decsions?
>
> Thanks
>


Re: Installing signalfx on flink image

2021-10-08 Thread Caizhi Weng
Hi!

Just as you said, you need to build your own custom image based on the
official Flink image and copy the signalfx jar into the lib directory.

Deniz Koçak  于2021年10月9日周六 上午3:58写道:

> Hi,
>
> In order to install SignalFX on Flink it says on Flink Packages page [1]
> `In order to use this reporter you must copy
> flink-metrics-signalfx-{{version}}.jar into the /lib folder of your
> Flink distribution.` We have been using official/original Docker
> images of Flink distributions supplied by Ververica, so I wonder
> what's the possible way(s) of installing signalfx on it. Should we
> create our own custom image using the one supplied by Ververica and
> keep it in a private registry? What would be the best approach to use
> SignalFX in all projects we have been developing?
>
> Thanks,
>
> [1] https://flink-packages.org/packages/flink-metrics-signalfx
>


Re: Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Dian Fu
Hi Kamil,

I guess `statement_set.execute()` should be enough. You could also check
whether the job graph is expected via one of the following ways:
- Call `print(statement_set.explain())`
- Check the Flink web ui to see the job graph of the running job

For your problems, could you double check whether the FileSystem sinks were
well configured. You could refer to [1] for more details.

Regards,
Dian

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/filesystem/#rolling-policy


On Fri, Oct 8, 2021 at 8:47 PM Kamil ty  wrote:

> Hello,
> In my pyflink job I have such flow:
>
> 1. Use table API to get messages from Kafka
> 2. Convert the table to a data stream
> 3. Convert the data stream back to the table API
> 4. Use a statement set to write the data to two filesystem sinks (avro and
> parquet)
>
> I'm able to run the job and everything seems to be working but the files
> are not filling with data and are stuck in progress.
>
> I'm suspecting that I'm doing something wrong with how i run .execute().
>
> Currently at the end of my script I use:
> statement_set.execute()
> streaming_environment.execute("My job")
>
> My question is what would be the correct way to run a job with the flow
> specified. I can share the code if needed.
>
> I would appreciate any help.
>
> Kind regards
> Kamil
>


Re: Flink S3 Presto Checkpointing Permission Forbidden

2021-10-08 Thread Rommel Holmes
You already have s3 request ID, you can easily reach out to AWS tech
support to know what account was used to write to S3. I guess that account
probably doesn't have permission to do the following:

"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject",
"s3:ListBucket"

Then grant the account with the permission in k8s. Then you should be good
to go.




On Fri, Oct 8, 2021 at 6:06 AM Denis Nutiu  wrote:

> Hello,
>
> I'm trying to deploy my Flink cluster inside of an AWS EKS using Flink
> Native. I want to use S3 as a filesystem for checkpointing, and giving the
> following options related to flink-s3-fs-presto:
>
> "-Dhive.s3.endpoint": "https://s3.eu-central-1.amazonaws.com";
> "-Dhive.s3.iam-role": "arn:aws:iam::xxx:role/s3-flink"
> "-Dhive.s3.use-instance-credentials": "true"
> "-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS":
> "flink-s3-fs-presto-1.13.2.jar"
> "-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS":
> "flink-s3-fs-presto-1.13.2.jar"
> "-Dstate.backend": "rocksdb"
> "-Dstate.backend.incremental": "true"
> "-Dstate.checkpoints.dir": "s3://bucket/checkpoints/"
> "-Dstate.savepoints.dir": "s3://bucket/savepoints/"
>
> But my job fails with:
>
> 2021-10-08 11:38:49,771 WARN
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Could
> not properly dispose the private states in the pending checkpoint 45 of job
> 75bdd6fb6e689961ef4e096684e867bc.
> com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
> JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
> u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
> Proxy: null), S3 Extended Request ID:
> u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=
> (Path: s3://bucket/checkpoints/75bdd6fb6e689961ef4e096684e867bc/chk-45)
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
> ~[?:?]
> at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
> ~[?:?]
> at
> com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
> ~[?:?]
> at
> org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160)
> ~[?:?]
> at
> org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.disposeOnFailure(FsCheckpointStorageLocation.java:117)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.PendingCheckpoint.discard(PendingCheckpoint.java:588)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
> ~[flink-dist_2.11-1.13.2.jar:1.13.2]
> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
> [?:?]
> at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
> Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
> at java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden
> (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request
> ID: JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
> u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
> Proxy: null)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
> ~[?:?]
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
> ~[?:?]

Flink streaming file sink to s 3 cannot recover from failure

2021-10-08 Thread shamit jain
Hello Experts,

we have a flink streaming job which read data from kafka and sink it to S3. We 
were using Flink's internal table API to achieve this.  and I have a case where 
I am not able to recover my job from snapshot.

I am getting below exception. Can you please help.


2021-10-08 18:43:13
java.io.IOException: Recovering commit failed for object 
topics/testtopic/YEAR=2021/MONTH=10/DAY=08/HOUR=21/MIN=34/.uncompacted-part-3ae64209-a2b5-4f42-8f70-1e46e7e0a83c-1-2.
 Object does not exist and MultiPart Upload 
JsbsI_mYvHTkB2GgPpuyfD49JuEfGjtjQ6I6LQJ0CZMEI0XCcpcVtMLIWCU635jvh0nOlbNrqhmfSjUdc2hrEId.eqP2P93O44s5rBEUZWwehcfzrIlQb.U0GfZeb1.wOp2tkD5TTNzd8waWBOr7sw--
 is not valid.
at 
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:123)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.(Bucket.java:127)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
at org.apache.flink.streaming.api.functions.sink.filesystem.Def

regards,
Shamit Jain



Installing signalfx on flink image

2021-10-08 Thread Deniz Koçak
Hi,

In order to install SignalFX on Flink it says on Flink Packages page [1]
`In order to use this reporter you must copy
flink-metrics-signalfx-{{version}}.jar into the /lib folder of your
Flink distribution.` We have been using official/original Docker
images of Flink distributions supplied by Ververica, so I wonder
what's the possible way(s) of installing signalfx on it. Should we
create our own custom image using the one supplied by Ververica and
keep it in a private registry? What would be the best approach to use
SignalFX in all projects we have been developing?

Thanks,

[1] https://flink-packages.org/packages/flink-metrics-signalfx


Impossible to get pending file names/paths on checkpoint?

2021-10-08 Thread Preston Price
I am trying to implement a File Sink that persists files to Azure Data
Lake, and then on commit I want to ingest these files to Azure Data
Explorer. Persisting the files is pretty trivial using the ABFS connector.

However, it does not appear to be possible to get any details about
names/paths to the pending files when they're committed. There are very few
details exposed in FileSinkCommittable, so I am currently blocked. The
paths to the files are needed when issuing ingest commands to the Azure
Data Explorer API. I have considered using automated ingestion for Azure
Data Explorer with EventHub but I need more control over the ingestion
commands for my use case.

I'm finding it very difficult to extend the functionality of the FileSink
as many public classes and interfaces have private constructors, or package
protected return types so I have to re-implement a significant amount of
these features to make minor changes.

Perhaps I'm pursuing this solution in the wrong way?
Thanks for any clues or guidance.


Re: jdbc connector configuration

2021-10-08 Thread Qihua Yang
It is pretty clear. Thanks Caizhi!

On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:

> Hi!
>
> These configurations are not required to merely read from a database. They
> are here to accelerate the reads by allowing sources to read data in
> parallel.
>
> This optimization works by dividing the data into several
> (scan.partition.num) partitions and each partition will be read by a task
> slot (not a task manager, as a task manager may have multiple task slots).
> You can set scan.partition.column to specify the partition key and also set
> the lower and upper bounds for the range of data.
>
> Let's say your partition key is the column "k" which ranges from 0 to 999.
> If you set the lower bound to 0, the upper bound to 999 and the number of
> partitions to 10, then all data satisfying 0 <= k < 100 will be divided
> into the first partition and read by the first task slot, all 100 <= k <
> 200 will be divided into the second partition and read by the second task
> slot and so on. So these configurations should have nothing to do with the
> number of rows you have, but should be related to the range of your
> partition key.
>
> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>
>> Hi,
>>
>> I am trying to read data from database with JDBC driver. From [1], I have
>> to config below parameters. I am not quite sure if I understand it
>> correctly. lower-bound is smallest value of the first partition,
>> upper-bound is largest value of the last partition. For example, if the db
>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
>> If  setting scan.partition.num to 10, each partition read 100 row?
>> if I set scan.partition.num to 10 and I have 10 task managers. Each task
>> manager will pick a partition to read?
>>
>>- scan.partition.column: The column name used for partitioning the
>>input.
>>- scan.partition.num: The number of partitions.
>>- scan.partition.lower-bound: The smallest value of the first
>>partition.
>>- scan.partition.upper-bound: The largest value of the last partition.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>
>> Thanks,
>> Qihua
>>
>


Re: Helper methods for catching unexpected key changes?

2021-10-08 Thread Dan Hill
Yes, my code should be doing that.  We mostly keyBy a userId.  I'm guessing
there is a subtle bug that's causing issues.  My plan is to log more
details about the input causing the key issue and looking at the raw inputs
that impact that element's key.  I was curious if people have found other
tools that help.

On Thu, Oct 7, 2021 at 11:20 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Good morning Dan,
>
>
>
> Being short of information on how you arranged your job, I can only make
> general comments:
>
>
>
> ReinterpretAsKeyedStream *only* applies to data streams that are in fact
> partitioned by the same key, i.e. your job would look somewhat like this:
>
>
>
> DataStreamUtils.reinterpretAsKeyedStream(
>
> Stream
>
> .keyBy(keyExtractor1)
>
> .process(keyedProcessFunction1)//or any of the other keyed operators
>
> ,keyExtractor2 …
>
> )
>
> .process(keyedProcessFunction2) //or any of the other keyed operators
>
>
>
> keyExtractor1 and keyExtractor2 need to come to the same result for
> related events (input/output of keyedProcessFuntion1 resp.)
>
>
>
> I assume your exception happens in keyedProcessFunction2?
>
>
>
> reinterpretAsKeyedStream makes sense if you want to chain
> keyedProcessFunction1 and keyedProcessFunction2, otherwise keyBy() will do …
>
>
>
> I hope these hints help, otherwise feel free to get back to the mailing
> list with a more detailed description of your arrangement 😊
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
>
>
>
>
>
>
>
>
> *From:* Dan Hill 
> *Sent:* Freitag, 8. Oktober 2021 06:49
> *To:* user 
> *Subject:* Helper methods for catching unexpected key changes?
>
>
>
> Hi.  I'm getting the following errors when using
> reinterpretAsKeyedStream.  I don't expect the key to change for rows in
> reinterpretAsKeyedStream.  Are there any utilities that I can use that I
> can use with reinterpetAsKeyedStream to verify that the key doesn't
> change?  E.g. some wrapper operator?
>
>
>
>
>
>
> 2021-10-02 16:38:46
> java.lang.IllegalArgumentException: key group from 154 to 156 does not
> contain 213
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
> at
> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
> at
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
> at
> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


Re: Kubernetes HA - Reusing storage dir for different clusters

2021-10-08 Thread Alexis Sarda-Espinosa
Hi Yang,

thanks for the confirmation. If I manually stop the job by deleting the 
Kubernetes deployment before it completes, I suppose the files will not be 
cleaned up, right? That's a somewhat non-standard scenario, so I wouldn't 
expect Flink to clean up, I just want to be sure.

Regards,
Alexis.


From: Yang Wang 
Sent: Friday, October 8, 2021 5:24 AM
To: Alexis Sarda-Espinosa 
Cc: Flink ML 
Subject: Re: Kubernetes HA - Reusing storage dir for different clusters

When the Flink job reached to global terminal state(FAILED, CANCELED, 
FINISHED), all the HA related data(including pointers in ConfigMap and concrete 
data in DFS) will be cleaned up automatically.

Best,
Yang

Alexis Sarda-Espinosa 
mailto:alexis.sarda-espin...@microfocus.com>>
 于2021年10月4日周一 下午3:59写道:

Hello,



If I deploy a Flink-Kubernetes application with HA, I need to set 
high-availability.storageDir. If my application is a batch job that may run 
multiple times with the same configuration, do I need to manually clean up the 
storage dir between each execution?



Regards,

Alexis.




Flink production configuration

2021-10-08 Thread Sigalit Eliazov
Hello
New to flink and I am struggling with defining the correct configuration.
Currently we decided we have only one job manger.
We have 5 different pipelines.
We are not sure how many task managers needs to be defined, how many slots.

Can you please send reference or is there any calculator that assist in
such decsions?

Thanks


-- Forwarded message -
מאת: Sigalit B 
‪Date: יום ה׳, 7 באוק׳ 2021, 12:40‬
Subject: Flink production configuration
To: 


Hello
New to flink and I am struggling with defining the correct configuration.
Currently we decided we have only one job manger.
We have 5 different pipelines.
We are not sure how many task managers needs to be defined, how many slots.

Can you please send reference or is there any calculator that assist in
such decsions?

Thanks


OVER IGNORE NULLS support

2021-10-08 Thread Adrian Bednarz
Hi,

we've been trying to run a query similar to
SELECT id, type, LAG(id) IGNORE NULLS OVER (PARTITION BY type ORDER BY ts)
AS lastId
  FROM Events

A query without IGNORE NULLS clause executes just fine. This syntax is
supported by Calcite and our clients expect it to work. Our platform uses
FlinkSQL to execute certain types of queries and currently such syntax
causes jobs to fail with NPE. Here's a stack trace

Exception in thread "main" org.apache.flink.table.api.ValidationException:
SQL validation failed. null
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:164)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:107)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:215)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:736)
at com.example.OverIgnoreNullsJob.main(OverIgnoreNullsJob.java:37)
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.apache.calcite.sql.SqlBasicCall.setOperator(SqlBasicCall.java:67)
at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:530)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
at org.apache.calcite.sql.type.SqlTypeUtil.deriveType(SqlTypeUtil.java:178)
at
org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkSingleOperandType(FamilyOperandTypeChecker.java:71)
at
org.apache.calcite.sql.type.FamilyOperandTypeChecker.checkOperandTypes(FamilyOperandTypeChecker.java:122)
at
org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at
org.apache.calcite.sql.SqlOverOperator.deriveType(SqlOverOperator.java:86)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5710)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5697)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1736)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1727)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:421)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4061)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3347)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:159)
... 5 more

By looking through the codebase I concluded that such syntax is not
implemented by Flink engine. Have you considered including this syntax to
Flink? Or is it a deliberate decision not to have it?

Regards,
Adrian


Flink S3 Presto Checkpointing Permission Forbidden

2021-10-08 Thread Denis Nutiu
Hello,

I'm trying to deploy my Flink cluster inside of an AWS EKS using Flink
Native. I want to use S3 as a filesystem for checkpointing, and giving the
following options related to flink-s3-fs-presto:

"-Dhive.s3.endpoint": "https://s3.eu-central-1.amazonaws.com";
"-Dhive.s3.iam-role": "arn:aws:iam::xxx:role/s3-flink"
"-Dhive.s3.use-instance-credentials": "true"
"-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS":
"flink-s3-fs-presto-1.13.2.jar"
"-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS":
"flink-s3-fs-presto-1.13.2.jar"
"-Dstate.backend": "rocksdb"
"-Dstate.backend.incremental": "true"
"-Dstate.checkpoints.dir": "s3://bucket/checkpoints/"
"-Dstate.savepoints.dir": "s3://bucket/savepoints/"

But my job fails with:

2021-10-08 11:38:49,771 WARN
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Could
not properly dispose the private states in the pending checkpoint 45 of job
75bdd6fb6e689961ef4e096684e867bc.
com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException:
com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
Proxy: null), S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=
(Path: s3://bucket/checkpoints/75bdd6fb6e689961ef4e096684e867bc/chk-45)
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.lambda$getS3ObjectMetadata$2(PrestoS3FileSystem.java:573)
~[?:?]
at com.facebook.presto.hive.RetryDriver.run(RetryDriver.java:138) ~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getS3ObjectMetadata(PrestoS3FileSystem.java:560)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.getFileStatus(PrestoS3FileSystem.java:311)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.directory(PrestoS3FileSystem.java:450)
~[?:?]
at
com.facebook.presto.hive.s3.PrestoS3FileSystem.delete(PrestoS3FileSystem.java:427)
~[?:?]
at
org.apache.flink.fs.s3presto.common.HadoopFileSystem.delete(HadoopFileSystem.java:160)
~[?:?]
at
org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.delete(PluginFileSystemFactory.java:155)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation.disposeOnFailure(FsCheckpointStorageLocation.java:117)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.PendingCheckpoint.discard(PendingCheckpoint.java:588)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:60)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at
org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanup$2(CheckpointsCleaner.java:85)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden
(Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request
ID: JEZ3X8YPDZ2TF4T9; S3 Extended Request ID:
u2RBcDpifTnzO4hIOGqgTOKDY+nw6iSeSepd4eYThITCPCpVddIUGMU7jY5DpJBg1LkPuYXiH9c=;
Proxy: null)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1811)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1395)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1371)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
~[?:?]
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
~[?:?]
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5062)
~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5008)
~[?:?]
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(Am

Pyflink data stream API to Table API conversion with multiple sinks.

2021-10-08 Thread Kamil ty
Hello,
In my pyflink job I have such flow:

1. Use table API to get messages from Kafka
2. Convert the table to a data stream
3. Convert the data stream back to the table API
4. Use a statement set to write the data to two filesystem sinks (avro and
parquet)

I'm able to run the job and everything seems to be working but the files
are not filling with data and are stuck in progress.

I'm suspecting that I'm doing something wrong with how i run .execute().

Currently at the end of my script I use:
statement_set.execute()
streaming_environment.execute("My job")

My question is what would be the correct way to run a job with the flow
specified. I can share the code if needed.

I would appreciate any help.

Kind regards
Kamil


Re: Pyflik job data stream to table conversion declareManagedMemory exception

2021-10-08 Thread Kamil ty
Hey you are right. The issue was with Flink and pyflink version mismatch.
It turned out Flink 1.12 was installed on the cluster. Downgrading pyflink
from 1.12.3 to 1.12 fixed the issue.

Thank you for your help.

On Fri, 8 Oct 2021, 04:04 Dian Fu,  wrote:

> Hi Kamil,
>
> I have checked that this method exists in 1.12.3:
> https://github.com/apache/flink/blob/release-1.12.3/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java#L137
>
> Could you double check whether the Flink version is 1.12.3 (not just the
> PyFlink version)?
>
> Regards,
> Dian
>
>
>
> On Tue, Oct 5, 2021 at 11:34 PM Nicolaus Weidner <
> nicolaus.weid...@ververica.com> wrote:
>
>> Hi Kamil,
>>
>> On Tue, Oct 5, 2021 at 9:03 AM Kamil ty  wrote:
>>
>>> Hello,
>>>
>>> I'm trying to run a pyflink job in cluster mode (with yarn). My job
>>> contains source and sink definitions using Table API which are converted to
>>> a datastream and back. Unfortunately I'm getting an unusual exception at:
>>> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name).*
>>>
>>
>> Just to make sure: Is the missing quotation mark just a typo in your
>> mail, or your code (right before the closing bracket)?
>> *table = t_env.from_data_stream(ds, 'user_id, first_name, last_name['])*
>>
>> Best regards,
>> Nico
>>
>


Re: Empty Kafka topics and watermarks

2021-10-08 Thread Piotr Nowojski
Hi James,

I believe you have encountered a bug that we have already fixed [1]. The
small problem is that in order to fix this bug, we had to change some
`@PublicEvolving` interfaces and thus we were not able to backport this fix
to earlier minor releases. As such, this is only fixed starting from 1.14.x.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18934

pt., 8 paź 2021 o 11:55 James Sandys-Lumsdaine 
napisał(a):

> Hi everyone,
>
> I'm putting together a Flink workflow that needs to merge historic data
> from a custom JDBC source with a Kafka flow (for the realtime data). I have
> successfully written the custom JDBC source that emits a watermark for the
> last event time after all the DB data has been emitted but now I face a
> problem when joining with data from the Kafka stream.
>
> I register a timer in my KeyedCoProcessFunction joining the DB stream
> with live Kafka stream so I can emit all the "batch" data from the DB in
> one go when completely read up to the watermark but the timer never fires
> as the Kafka stream is empty and therefore doesn't emit a watermark. My
> Kafka stream will allowed to be empty since all the data will have been
> retrieved from the DB call so I only expect new events to appear over
> Kafka. Note that if I replace the Kafka input with a simple
> env.fromCollection(...) empty list then the timer triggers fine as Flink
> seems to detect it doesn't need to wait for any input from stream 2. So it
> seems to be something related to the Kafka stream status that is preventing
> the watermark from advancing in the KeyedCoProcessFunction.
>
> I have tried configuring the Kafka stream timestamp and watermark
> strategies to so the source is marked as idle after 10 seconds but still it
> seems the watermark in the join operator combining these 2 streams is not
> advancing. (See example code below).
>
> Maybe this is my bad understanding but I thought if an input stream into a
> KeyedCoProcessFunction is idle then it wouldn't be considered by the
> operator for forwarding the watermark i.e. it would forward the non-idle
> input stream's watermark and not do a min(stream1WM, stream2WM). With the
> below example I never see the onTimer fire and the only effect the
> withIdleness() strategy has is to stop the print statements in
> onPeriodicEmit() happening after 5 seconds (env periodic emit is set to the
> default 200ms so I see 25 rows before it stops).
>
> The only way I can get my KeyedCoProcessFunction timer to fire is to force
> an emit of the watermark I want in the onPeriodicEmit() after x numbers of
> attempts to advance an initial watermark i.e. if onPeriodicEmit() is called
> 100 times and the "latestWatermark" is still Long.MIN_VALUE then I emit the
> watermark I want so the join can progress. This seems like a nasty hack to
> me but perhaps something like this is actually necessary?
>
> I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any
> pointers would be appreciated.
>
> Thanks in advance,
>
> James.
>
> FlinkKafkaConsumer positionsFlinkKafkaConsumer = new
> FlinkKafkaConsumer<>("poc.positions",
> ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class,
> SchemaRegistryURL), kafkaConsumerProperties);
>
> positionsFlinkKafkaConsumer.setStartFromEarliest();
>
> positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(
>
>new WatermarkStrategy() {
>
>   @Override
>
>   public TimestampAssigner
> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
>
> return (event, recordTimestamp) -> {
>
> return event.getPhysicalFrom();
>
> };
>
> }
>
>
>
>   @Override
>
>   public WatermarkGenerator
> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
>
> return new WatermarkGenerator() {
>
> public long latestWatermark = Long.MIN_VALUE;
>
>
>
> @Override
>
> public void onEvent(Position event, long
> timestamp, WatermarkOutput output) {
>
> long eventWatermark =
> event.getPhysicalFrom();
>
> if (eventWatermark > latestWatermark)
>
> latestWatermark = eventWatermark;
>
> }
>
>
>
> @Override
>
> public void onPeriodicEmit(WatermarkOutput
> output) {
>
> System.out.printf("Emitting watermark
> %d\n", latestWatermark);
>
> output.emitWatermark(new
> Watermark(latestWatermark));
>
> }
>
> };
>
> }
>
> }.withIdleness(Duration.ofSeconds(5)));
>
>
>
> DataStream positionKafkaInputStream =
> env.addSource(positionsFli

Empty Kafka topics and watermarks

2021-10-08 Thread James Sandys-Lumsdaine
Hi everyone,

I'm putting together a Flink workflow that needs to merge historic data from a 
custom JDBC source with a Kafka flow (for the realtime data). I have 
successfully written the custom JDBC source that emits a watermark for the last 
event time after all the DB data has been emitted but now I face a problem when 
joining with data from the Kafka stream.

I register a timer in my KeyedCoProcessFunction joining the DB stream with live 
Kafka stream so I can emit all the "batch" data from the DB in one go when 
completely read up to the watermark but the timer never fires as the Kafka 
stream is empty and therefore doesn't emit a watermark. My Kafka stream will 
allowed to be empty since all the data will have been retrieved from the DB 
call so I only expect new events to appear over Kafka. Note that if I replace 
the Kafka input with a simple env.fromCollection(...) empty list then the timer 
triggers fine as Flink seems to detect it doesn't need to wait for any input 
from stream 2. So it seems to be something related to the Kafka stream status 
that is preventing the watermark from advancing in the KeyedCoProcessFunction.

I have tried configuring the Kafka stream timestamp and watermark strategies to 
so the source is marked as idle after 10 seconds but still it seems the 
watermark in the join operator combining these 2 streams is not advancing. (See 
example code below).

Maybe this is my bad understanding but I thought if an input stream into a 
KeyedCoProcessFunction is idle then it wouldn't be considered by the operator 
for forwarding the watermark i.e. it would forward the non-idle input stream's 
watermark and not do a min(stream1WM, stream2WM). With the below example I 
never see the onTimer fire and the only effect the withIdleness() strategy has 
is to stop the print statements in onPeriodicEmit() happening after 5 seconds 
(env periodic emit is set to the default 200ms so I see 25 rows before it 
stops).

The only way I can get my KeyedCoProcessFunction timer to fire is to force an 
emit of the watermark I want in the onPeriodicEmit() after x numbers of 
attempts to advance an initial watermark i.e. if onPeriodicEmit() is called 100 
times and the "latestWatermark" is still Long.MIN_VALUE then I emit the 
watermark I want so the join can progress. This seems like a nasty hack to me 
but perhaps something like this is actually necessary?

I am currently using Flink 1.12.3, a Confluent Kafka client 6.1.1. Any pointers 
would be appreciated.

Thanks in advance,

James.


FlinkKafkaConsumer positionsFlinkKafkaConsumer = new 
FlinkKafkaConsumer<>("poc.positions", 
ConfluentRegistryAvroDeserializationSchema.forSpecific(Position.class, 
SchemaRegistryURL), kafkaConsumerProperties);

positionsFlinkKafkaConsumer.setStartFromEarliest();

positionsFlinkKafkaConsumer.assignTimestampsAndWatermarks(

   new WatermarkStrategy() {

  @Override

  public TimestampAssigner 
createTimestampAssigner(TimestampAssignerSupplier.Context context) {

return (event, recordTimestamp) -> {

return event.getPhysicalFrom();

};

}



  @Override

  public WatermarkGenerator 
createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {

return new WatermarkGenerator() {

public long latestWatermark = Long.MIN_VALUE;



@Override

public void onEvent(Position event, long timestamp, 
WatermarkOutput output) {

long eventWatermark = event.getPhysicalFrom();

if (eventWatermark > latestWatermark)

latestWatermark = eventWatermark;

}



@Override

public void onPeriodicEmit(WatermarkOutput output) {

System.out.printf("Emitting watermark %d\n", 
latestWatermark);

output.emitWatermark(new 
Watermark(latestWatermark));

}

};

}

}.withIdleness(Duration.ofSeconds(5)));



DataStream positionKafkaInputStream = 
env.addSource(positionsFlinkKafkaConsumer, "Kafka-Source");



DataStream otherPositionStream = 
env.fromCollection(Lists.newArrayList(new Position(..., 
timestamp.getMillis())), TypeInformation.of(Position.class));

otherPositionStream.assignTimestampsAndWatermarks(

WatermarkStrategy


.forBoundedOutOfOrderness(Duration.ofSeconds(10))

.withTimestampAssigner((e, t) -> e.getPhysicalFrom()));



KeyedStream keyedPositionKafkaInputStream = 
positionKafkaInputStream.keyBy(p -> p.getMarketName());

KeyedStream keyedOtherPositionStream = 
otherPositionStream.keyBy(p -> p

Re: How to ugrade JobManagerCommunicationUtils from FLink 1.4 to Flink 1.5?

2021-10-08 Thread Piotr Nowojski
Hi,

`JobManagerCommunicationUtils` was never part of Flink's API. It was an
internal class, for our internal unit tests. Note that Flink's public API
is annotated with `@Public`, `@PublicEvolving` or `@Experimental`. Anything
else by default is internal (sometimes to avoid confusion we are annotating
internal classes with `@Internal`).

JobManagerCommunicationUtils seems to be replaced with
`MiniClusterResource` [1] as part of [2]. Note that MiniClusterResource is
also not a public API, so it's subject to change or to be completely
removed without warning. You can read about how to test your applications
here [3], and about `MiniClusterResource` in particular here [4].

Piotrek

PS Flink 1.5 is also not supported for something like 2 years now?
Currently officially supported Flink versions are 1.13, and 1.14 so I would
encourage you to upgrade to one of those.

[1]
https://github.com/apache/flink/commits/release-1.5/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java
[2] https://issues.apache.org/jira/browse/FLINK-8703
[3]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/testing/
[4]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/testing/#testing-flink-jobs

pt., 8 paź 2021 o 10:11 Felipe Gutierrez 
napisał(a):

> Hello there,
>
> what is the replacement from Flink 1.4 to Flink 1.5 of the class
> JobManagerCommunicationUtils.java [1] below?
>
> JobManagerCommunicationUtils.cancelCurrentJob
> JobManagerCommunicationUtils.waitUntilNoJobIsRunning
>
> I want to upgrade Flink from 1.4 to 1.5 but I cant find this class in the
> docs of the previous version [2] neither on the next version [3]. My plan
> is also to upgrade Flink to the lates version. But if I cannot find a way
> to the next version 1.4 -> 1.5, I suppose that for a greater it will be
> even more difficult.
>
> [1]
> https://github.com/a0x8o/flink/blob/master/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java#L38-L60
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html
>
> Thanks in advance,
> Felipe
>
> *--*
> *-- Felipe Gutierrez*
>


How to ugrade JobManagerCommunicationUtils from FLink 1.4 to Flink 1.5?

2021-10-08 Thread Felipe Gutierrez
Hello there,

what is the replacement from Flink 1.4 to Flink 1.5 of the class
JobManagerCommunicationUtils.java [1] below?

JobManagerCommunicationUtils.cancelCurrentJob
JobManagerCommunicationUtils.waitUntilNoJobIsRunning

I want to upgrade Flink from 1.4 to 1.5 but I cant find this class in the
docs of the previous version [2] neither on the next version [3]. My plan
is also to upgrade Flink to the lates version. But if I cannot find a way
to the next version 1.4 -> 1.5, I suppose that for a greater it will be
even more difficult.

[1]
https://github.com/a0x8o/flink/blob/master/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/JobManagerCommunicationUtils.java#L38-L60
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/connectors/kafka/package-summary.html

Thanks in advance,
Felipe

*--*
*-- Felipe Gutierrez*


Re: Start Flink cluster, k8s pod behavior

2021-10-08 Thread Yang Wang
Did you use the "jobmanager.sh start-foreground" in your own
"run-job-manager.sh", just like what Flink has done
in the docker-entrypoint.sh[1]?

I strongly suggest to start the Flink session cluster with official
yamls[2].

[1].
https://github.com/apache/flink-docker/blob/master/1.13/scala_2.11-java11-debian/docker-entrypoint.sh#L114
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/standalone/kubernetes/#starting-a-kubernetes-cluster-session-mode

Best,
Yang

Qihua Yang  于2021年10月1日周五 上午2:59写道:

> Looks like after script *flink-daemon.sh *complete, it return exit 0.
> Kubernetes regard it as done. Is that expected?
>
> Thanks,
> Qihua
>
> On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang  wrote:
>
>> Thank you for your reply.
>> From the log, exit code is 0, and reason is Completed.
>> Looks like the cluster is fine. But why kubenetes restart the pod. As you
>> said, from perspective of Kubernetes everything is done. Then how to
>> prevent the restart?
>> It didn't even give chance to upload and run a jar
>>
>> Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP
>> Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP
>> Command:
>>   /opt/flink/bin/entrypoint.sh
>> Args:
>>   /opt/flink/bin/run-job-manager.sh
>> State:  Waiting
>>   Reason:   CrashLoopBackOff
>> Last State: Terminated
>>   Reason:   Completed
>>   Exit Code:0
>>   Started:  Wed, 29 Sep 2021 20:12:30 -0700
>>   Finished: Wed, 29 Sep 2021 20:12:45 -0700
>> Ready:  False
>> Restart Count:  131
>>
>> Thanks,
>> Qihua
>>
>> On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler 
>> wrote:
>>
>>> Is the run-job-manager.sh script actually blocking?
>>> Since you (apparently) use that as an entrypoint, if that scripts exits
>>> after starting the JM then from the perspective of Kubernetes everything is
>>> done.
>>>
>>> On 30/09/2021 08:59, Matthias Pohl wrote:
>>>
>>> Hi Qihua,
>>> I guess, looking into kubectl describe and the JobManager logs would
>>> help in understanding what's going on.
>>>
>>> Best,
>>> Matthias
>>>
>>> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang  wrote:
>>>
 Hi,
 I deployed flink in session mode. I didn't run any jobs. I saw below
 logs. That is normal, same as Flink menual shows.

 + /opt/flink/bin/run-job-manager.sh
 Starting HA cluster with 1 masters.
 Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
 Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.


 But when I check kubectl, it shows status is Completed. After a while,
 status changed to CrashLoopBackOff, and pod restart.
 NAME  READY
   STATUS RESTARTS   AGE
 job-manager-776dcf6dd-xzs8g   0/1 Completed  5
  5m27s

 NAME  READY
   STATUS RESTARTS   AGE
 job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
  7m35s

 Anyone can help me understand why?
 Why do kubernetes regard this pod as completed and restart? Should I
 config something? either Flink side or Kubernetes side? From the Flink
 manual, after the cluster is started, I can upload a jar to run the
 application.

 Thanks,
 Qihua

>>>
>>>


AW: How to create backpressure with a Statefun remote function?

2021-10-08 Thread Christian Krudewig (Corporate Development)
Hello Igal,

 

Thanks, these are two valuable tips I will try. The maxNumBatchRequests 
parameter I misunderstood. I thought it was the maximum number of “function 
calls” which are sent within one web request. I’ll experiment a bit with it. It 
woud still be static for the flink cluster’s lifetime, but at least we could 
have a different setting per remote function definition.

And yes, autoscaling is also an option which makes sense. But I’ll have to test 
if it is fast enough to react. Because a newly spawned container might need to 
deserialize some data, connect to a database etc., so scaling can be slow. And 
of course at some point also limits of the cluster might be reached. As it is 
Flink doesn’t slow down with sending messages until the service is ready.

 

I guess with the two tips together we can achieve a fairly robust setup.

 

For future design a simple solution would be to allow to define an HTTP status 
code in the module definition which signals backpressure. So one could define 
that a status 429 or 503 makes Flink create backpressure just as if the 
“statefun.async.max-per-task” would have been reached. Of course also more 
sophisticated rules like a dynamic concurrency limiting based on latencies is 
thinkable, but more complicated.

 

Best,

 

Christian 

 

 

 

 

Von: Igal Shilman  
Gesendet: Donnerstag, 7. Oktober 2021 21:32
An: Christian Krudewig (Corporate Development) 
Cc: user@flink.apache.org
Betreff: Re: How to create backpressure with a Statefun remote function?

 

Hello Christian,

 

The challenge with generic back pressure and remote functions, is that StateFun 
doesn't know if it targets a single process or a fleet of processes behind a 
load balancer and an autoscaler.

Triggering back pressure too early might never kick in the autoscaling.

 

Indeed that parameter you have found will trigger back pressure when the total 
number of requests per task slot exceeds that value. There is an additional 
param that will trigger back pressure per function address.

This is called: maxNumBatchRequests

And is more fine-grained than the per-task slot parameter. Reducing this value 
might be recommend if the total processing time of a single message is 
potentially high (CPU intensive/ or a long IO)

 

I think that this use case is valid, and we need to think about the case where 
the set of remote functions is static (up to a manual scale up)

I don't have a good idea at the moment as deciding to rather to back pressure 
or not requires some sort of a global knowledge.

 

What I would recommend is, if it fits your infra, is to consider an auto scaler 
for the remote functions according to a metric that makes sense to you, and use 
the max-in-flight parameter as a high safety net.

 

Cheers,

Igal

 

On Thu 7. Oct 2021 at 14:03, Christian Krudewig (Corporate Development) 
mailto:christian.krude...@dpdhl.com> > wrote:

Hello fellow Flink users,

How do you create backpressure with Statefun remote functions? I'm using an
asynchronous web server for the remote function (Python aiohttp on uvicorn)
which accepts more requests than its CPU bound backend can handle. That can
make the requests time out and can trigger a restart loop of the whole Flink
pipeline. Of course only in situations where so many requests are coming
into the ingress kafka stream that the service cannot handle it anymore.

Desired behavior: Flink only consumes as many records from the input stream
as the pipeline can handle instead of overloading the remote functions.

What I tried so far:
1. Set "statefun.async.max-per-task" in flink.conf to a low number. This
works. But that is one global static config for all function which cannot be
changed without restarting the cluster when the remote functions are scaled
up or down.
2. Add concurrency limiting to the remote function service. If the function
service returns failure codes (500, 503, 429) that doesn't seem to create
backpressure but is handled like a normal failure by flink with retries
until finally the whole pipeline gets restarted.
3. Try the new "io.statefun.transports.v1/async" transport type for the
endpoints with a low "pool_size" parameter. But reaching the pool size seems
to be treated like an error instead of creating backpressure. Same effect as
option 2.

Is there some other option? How should it be done by design?

Thanks,

Christian





smime.p7s
Description: S/MIME cryptographic signature


Re: Deploying Jobs in Application Mode on Kubernetes native

2021-10-08 Thread Yang Wang
1) In application mode, the lifecycle of K8s resources(e.g. deployment,
pods, ConfigMaps) is bound with Flink job. So it is
impossible to stop the job and leave the Flink cluster running in
application mode. For K8s session cluster, you could do this.
It will help to save the time for launching a new Flink cluster.

2) Flink have some internal ConfigMaps and volumes, they could not be
overwritten. But I think you could create other arbitrary
ConfigMaps, which contains the encoding secrets. Of cause, you could decode
them in the init-container and create a new
flink-conf.yaml. Then use the following config options to let the
JobManager and TaskManager starting with your generated
flink configuration.

containerized.master.env.FLINK_CONF_DIR=/path/of/your/flink-conf
containerized.taskmanager.env.FLINK_CONF_DIR=/path/of/your/flink-conf


Best,
Yang

Sangarshanan Veeraraghavan  于2021年10月4日周一
下午2:56写道:

> Hi,
>
> I am currently working shipping flink jobs to production and have two
> general queries/ concerns regarding the deployment that I wanted to list
> down
>
> 1) Currently we deploy jobs on Kubernetes on Application mode and want to
> do a blue-green deployment but Stopping a job with Savepoint terminates the
> deployment rather than suspend/ Finish the Job so that it can be later
> drained or Restarted, Is there any better way to do a safe upgrade for jobs
> running in Application mode on Kubernetes?
>
> 2) The pod-template used in Native deployment currently allows adding of
> new volume and configmaps but is there is a way we can override the
> existing volumes, configmaps used for flink-conf.yaml without depending on
> the cli arguments for more complex usecases like encoding secrets in
> configmaps and decoding them in our init-containers
>
> Thank you
>
> [image: https://grofers.com]