Re: Random incorrect checkpoint existence check

2022-04-25 Thread Hangxiang Yu
Hi, Chen-che,
I think it may be similar to FLINK-12381
.
You could adopt the suggestion like setting the job-id as you could see the
comment below the ticket.
I think you could also share your environment in this ticket to let us know
more information.

On Mon, Apr 25, 2022 at 9:58 AM Chen-Che Huang  wrote:

> Hi all,
>
> We recently encountered a random issue. When our Flink application is
> doing checkpoint creation, it occasionally fails because it thinks the
> medatafile of the checkpoint already exists. However, the medata file does
> not exist actually. We use Flink version 1.14.4 and the checkpoints are
> stored at google cloud storage. Does anyone encounter the same problem? Any
> comment is appreciated.
>
> Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: 
> 'gs://bucket/flink-app//chk-5812/_metadata' 
> already exists
>
> Best wishes,
>
> Chen-Che Huang
>
>


Re: 关于Flink1.15文档,有一些小疑惑求助

2022-04-25 Thread 林影
谢谢回复。
原来是这样,我原来还以为intermediate savepoints
指代的是checkpoint呢,我们这边的Flink平台正在做从ckp的状态恢复所以才有此一问。

Jiangang Liu  于2022年4月22日周五 19:52写道:

> intermediate
>
> savepoints指的是非stop-with-savepoint,也就是不停止作业的情况下做savepoint。我的感觉是,这个时候的savepoint不会commit(比如sink写出到外部系统),如果作业失败会从最近一次的checkpoint恢复。如果恰好有一个作业从savepoint恢复,两个作业同时跑,可能会造成结果的重复或者不一致,这个时候最好丢弃掉sink(换uid)。对于只有一个作业运行的情况,比如停了作业再恢复,是不会有问题的。
>
> 林影  于2022年4月22日周五 17:05写道:
>
> > 在Flink 官网savepoint的页面中,出现下面一段话
> > Starting from Flink 1.15 intermediate savepoints (savepoints other than
> > created with stop-with-savepoint
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#stopping-a-job-with-savepoint
> > >)
> > are not used for recovery and do not commit any side effects.
> >
> > intermediate savepoints具体指的是什么呢?
> >
>


Re: Unit testing PyFlink SQL project

2022-04-25 Thread Dian Fu
Great to hear!

Regards,
Dian

On Tue, Apr 26, 2022 at 4:11 AM John Tipper  wrote:

> Hi Dian,
>
> I've tried this and it works nicely, on both MacOS and Windows, thank you
> very much indeed for your help.
>
> Kind regards,
>
> John
> --
> *From:* Dian Fu 
> *Sent:* 25 April 2022 02:42
> *To:* John Tipper 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Unit testing PyFlink SQL project
>
> Hi John,
>
> I'm also using MacOS. This is the steps I'm following which I have run
> successfully:
> 1) python3 -m venv .venv
> 2) source .venv/bin/activate
> 3) pip install apache-flink==1.14.4
> 4) python -c "import pyflink;import
> os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
> It will print something like this:
> "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/log"
> 5) check the structure of the installed package:
> ```
> (.venv) (base) dianfu@B-7174MD6R-1908 testing % ls -lh
> /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/
> total 136
> -rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 README.txt
> -rw-r--r--   1 dianfu  staff   1.9K Apr 25 09:26 __init__.py
> drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 __pycache__
> drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 bin
> drwxr-xr-x  21 dianfu  staff   672B Apr 25 09:26 common
> drwxr-xr-x  13 dianfu  staff   416B Apr 25 09:26 conf
> drwxr-xr-x  20 dianfu  staff   640B Apr 25 09:26 datastream
> drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 examples
> -rw-r--r--   1 dianfu  staff   3.2K Apr 25 09:26 find_flink_home.py
> drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 fn_execution
> -rw-r--r--   1 dianfu  staff   9.1K Apr 25 09:26 gen_protos.py
> -rw-r--r--   1 dianfu  staff   7.6K Apr 25 09:26 java_gateway.py
> drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 lib
> drwxr-xr-x  28 dianfu  staff   896B Apr 25 09:26 licenses
> drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 log
> drwxr-xr-x   5 dianfu  staff   160B Apr 25 09:26 metrics
> drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 opt
> drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 plugins
> -rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 pyflink_callback_server.py
> -rw-r--r--   1 dianfu  staff12K Apr 25 09:26 pyflink_gateway_server.py
> -rw-r--r--   1 dianfu  staff   5.3K Apr 25 09:26 serializers.py
> -rw-r--r--   1 dianfu  staff   7.9K Apr 25 09:26 shell.py
> drwxr-xr-x  31 dianfu  staff   992B Apr 25 09:26 table
> drwxr-xr-x   6 dianfu  staff   192B Apr 25 09:26 util
> -rw-r--r--   1 dianfu  staff   1.1K Apr 25 09:26 version.py
> ```
> 6) Execute command `python3 -m unittest
> test_table_api.TableTests.test_scalar_function`
> The output is as following and you could see that it executes successfully:
> ```
> (.venv) (base) dianfu@B-7174MD6R-1908 testing % python3 -m unittest
> test_table_api.TableTests.test_scalar_function
> Using %s as FLINK_HOME...
> /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink
> Skipped download
> /Users/dianfu/code/src/github/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
> since it already exists.
> /Users/dianfu/miniconda3/lib/python3.8/subprocess.py:946: ResourceWarning:
> subprocess 71018 is still running
>   _warn("subprocess %s is still running" % self.pid,
> ResourceWarning: Enable tracemalloc to get the object allocation traceback
> Downloading jar org.apache.flink:flink-table-planner_2.11:1.14.4:jar:tests
> /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/table/table_environment.py:538:
> DeprecationWarning: Deprecated in 1.10. Use create_table instead.
>   warnings.warn("Deprecated in 1.10. Use create_table instead.",
> DeprecationWarning)
> .
> --
> Ran 1 test in 32.746s
>
> OK
> ```
>
> I have also tried your commands and run into the same error. I believe the
> difference comes from `python setup.py install` vs `pip install
> apache-flink==1.14.4`. When installing with command `python setup.py
> install`, the structure of the installed package is a little different
> from `pip install apache-flink==1.14.4`. I will dig into this and share the
> results when I have some findings.
>
> Before that, could you try to create a new clean virtual environment and
> see if the steps I'm following work for you?
>
> Regards,
> Dian
>
> On Mon, Apr 25, 2022 at 6:04 AM John Tipper 
> wrote:
>
> And now when I add further dependencies to the classpath to remove all
> ClassNotFound exceptions, I get a different error which I don't understand 
> (*TypeError:
> Could not found the Java class 'EnvironmentSettings.inStreamingMode'.*),
> see the logs below:
>
> $ python test_table_api.py TableTests.test_scalar_function
>
> Using %s as FLINK_HOME...
> /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Re: AvroRowDeserializationSchema

2022-04-25 Thread Dian Fu
Hi Quynh,

The same code in my last reply showed how to set the UID for the source
operator generated using Table API. I meant that you could firstly create a
source using Table API, then convert it to a DataStream API and set uid for
the source operator using the same code above, then perform operations with
DataStream API.

Regards,
Dian

On Mon, Apr 25, 2022 at 9:27 PM lan tran  wrote:

> Hi Dian,
>
> Thank again for fast response.
>
> As your suggestion above, we can apply to set the UID for only for the
> DataStream state (as you suggest to convert from table to data stream).
>
> However, at the first phase which is collecting the data from Kafka (
> having Debezium format), the UID cannot be set since we are using Table API
> (auto generate the UID).
>
> Therefore, if there is some crashed or needed revert using SavePoint, we
> cannot use it in the first phase since we cannot set the UID for this => so
> how can we revert it ?.
>
> As a result of that, we want to use DebeziumAvroRowDeserializationSchema
> and DebeziumJsonRowDeserializationSchema in the DataStream job to be able
> to use the Savepoint for the whole full flow.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 7:46 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> You could try the following code (also it may be a little hacky):
> ```
>
> def set_uid_for_source(ds: DataStream, uid: str):
>
> transformation = ds._j_data_stream.getTransformation()
>
>
>
> source_transformation = transformation
>
> while not source_transformation.getInputs().isEmpty():
>
> source_transformation = source_transformation.getInputs().get(0)
>
>
>
> source_transformation.setUid(uid)
>
> ```
>
> Besides, could you describe your use case a bit and also how you want to
> use DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
> the sources with these formats, it will send UPDATE messages to downstream
> operators.
>
> Regards
> Dian
>
>
>
> On Mon, Apr 25, 2022 at 12:31 PM lan tran 
> wrote:
>
> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran 
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM 

RE: UUID on TableAPI

2022-04-25 Thread lan tran
Ok, thanks for the clarification.   Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:26 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi Quynh,  My understanding is mostly based on the documentation I linked in the first reply. If the flink version and the query both remain the same then you can restart a job from a savepoint, this means that it might be workable for running a low-criticality job on say an AWS spot instance. That's about all I know. On Tue, 26 Apr 2022 at 10:17, lan tran  wrote:Hi Francis,Thanks for the reply. However, can you elaborate more on the part ‘work for cases where you wish to pause/resume a job’ ? Is there another way besides savepoint having this mechanism that can implement in Table API ? Best,Quynh  Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:07 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi  Quynh, Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are issued dynamically when you request them, flink won't know automatically what the last savepoint was, but you can start a new job and restore from a savepoint by passing in the UUID. All that said there are limitations around using savepoints and Flink SQL because of the way the planner works https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh. Sent from Mail for Windows  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia 


Re: UUID on TableAPI

2022-04-25 Thread Francis Conroy
 Hi Quynh,

My understanding is mostly based on the documentation I linked in the first
reply. If the flink version and the query both remain the same then you can
restart a job from a savepoint, this means that it might be workable for
running a low-criticality job on say an AWS spot instance. That's about all
I know.

On Tue, 26 Apr 2022 at 10:17, lan tran  wrote:

> Hi Francis,
>
> Thanks for the reply. However, can you elaborate more on the part ‘work
> for cases where you wish to pause/resume a job’ ? Is there another way
> besides savepoint having this mechanism that can implement in Table API ?
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Francis Conroy 
> *Sent: *Tuesday, April 26, 2022 7:07 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: UUID on TableAPI
>
>
>
> Hi  Quynh,
>
>
>
> Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are
> issued dynamically when you request them, flink won't know automatically
> what the last savepoint was, but you can start a new job and restore from a
> savepoint by passing in the UUID. All that said there are limitations
> around using savepoints and Flink SQL because of the way the planner works
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution.
> However it might work for cases where you wish to pause/resume a job.
>
>
>
> On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:
>
> Hi team,
> Currently, I want to use savepoints in Flink. However, one of the things
> that I concern is that is there any way we can set the UUID while using
> Table API (SQL API) ? If not, does it has any mechanism to know that when
> we start the Flink again, it will know that it was that UUID ?
>
> Best,
> Quynh.
>
>
>
> Sent from Mail  for
> Windows
>
>
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
>
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


RE: UUID on TableAPI

2022-04-25 Thread lan tran
Hi Francis,Thanks for the reply. However, can you elaborate more on the part ‘work for cases where you wish to pause/resume a job’ ? Is there another way besides savepoint having this mechanism that can implement in Table API ? Best,Quynh  Sent from Mail for Windows From: Francis ConroySent: Tuesday, April 26, 2022 7:07 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: UUID on TableAPI Hi  Quynh, Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are issued dynamically when you request them, flink won't know automatically what the last savepoint was, but you can start a new job and restore from a savepoint by passing in the UUID. All that said there are limitations around using savepoints and Flink SQL because of the way the planner works https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution. However it might work for cases where you wish to pause/resume a job. On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:Hi team,Currently, I want to use savepoints in Flink. However, one of the things that I concern is that is there any way we can set the UUID while using Table API (SQL API) ? If not, does it has any mechanism to know that when we start the Flink again, it will know that it was that UUID ?Best,Quynh. Sent from Mail for Windows  This email and any attachments are proprietary and confidential and are intended solely for the use of the individual to whom it is addressed. Any views or opinions expressed are solely those of the author and do not necessarily reflect or represent those of SwitchDin Pty Ltd. If you have received this email in error, please let us know immediately by reply email and delete it from your system. You may not use, disseminate, distribute or copy this message nor disclose its contents to anyone. SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia 


Re: UUID on TableAPI

2022-04-25 Thread Francis Conroy
Hi  Quynh,

Set the savepoint UUID? Is that what you mean? The savepoint UUIDs are
issued dynamically when you request them, flink won't know automatically
what the last savepoint was, but you can start a new job and restore from a
savepoint by passing in the UUID. All that said there are limitations
around using savepoints and Flink SQL because of the way the planner works
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution.
However it might work for cases where you wish to pause/resume a job.

On Fri, 22 Apr 2022 at 13:54, lan tran  wrote:

> Hi team,
> Currently, I want to use savepoints in Flink. However, one of the things
> that I concern is that is there any way we can set the UUID while using
> Table API (SQL API) ? If not, does it has any mechanism to know that when
> we start the Flink again, it will know that it was that UUID ?
>
> Best,
> Quynh.
>
>
>
> Sent from Mail  for
> Windows
>
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia


Re: flink-stop command fails with ` Operation not found under key`

2022-04-25 Thread Harsh Shah
Hello Huweihua (sorry for dup email, ended up clicking reply instead of
reply-all),

Thanks for reaching out and having a look at it. The job managers have the
same logs I shared before, another example below.

{"instant":{"epochSecond":1650920964,"nanoOfSecond":64800},"thread":"flink-akka.actor.default-dispatcher-15","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Triggering
stop-with-savepoint for job
.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":124,"threadPriority":5}
{"instant":{"epochSecond":1650920964,"nanoOfSecond":77700},"thread":"Checkpoint
Timer","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","message":"Triggering
checkpoint 724 (type=SAVEPOINT_SUSPEND) @ 1650920964650 for job
.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":120,"threadPriority":5}

*{"instant":{"epochSecond":1650920964,"nanoOfSecond":84600},"thread":"flink-rest-server-netty-worker-thread-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler","message":"Exception
occurred in REST handler: Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@382cf973","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":203,"threadPriority":5}*{"instant":{"epochSecond":1650920966,"nanoOfSecond":41700},"thread":"jobmanager-io-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","message":"Completed
checkpoint 724 for job  (3268755 bytes,
checkpointDuration=1443 ms, finalizationTime=324
ms).","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":105,"threadPriority":5}
{"instant":{"epochSecond":1650920966,"nanoOfSecond":58100},"thread":"flink-akka.actor.default-dispatcher-16","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Source:
line-items -> line-items-filter-valid-records -> Filter -> Map -> Filter ->
Filter -> Filter -> Timestamps/Watermarks (5/8)
(5ffc8a66e16b10c66a9eaef73bff538e) switched from RUNNING to
FINISHED.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":136,"threadPriority":5}
. // Bunch of 'RUNNING to FINISHED.' messages
{"instant":{"epochSecond":1650920967,"nanoOfSecond":95100},"thread":"flink-akka.actor.default-dispatcher-13","level":"INFO","loggerName":"org.apache.flink.runtime.executiongraph.ExecutionGraph","message":"Job
JOB_NAME () switched from state RUNNING to
FINISHED.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":97,"threadPriority":5}
{"instant":{"epochSecond":1650920967,"nanoOfSecond":95100},"thread":"flink-akka.actor.default-dispatcher-19","level":"INFO","loggerName":"org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager","message":"Clearing
resource requirements of job
","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":139,"threadPriority":5}
{"instant":{"epochSecond":1650920967,"nanoOfSecond":95100},"thread":"flink-akka.actor.default-dispatcher-13","level":"INFO","loggerName":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","message":"Stopping
checkpoint coordinator for job
.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":97,"threadPriority":5}
{"instant":{"epochSecond":1650920967,"nanoOfSecond":97400},"thread":"flink-akka.actor.default-dispatcher-13","level":"INFO","loggerName":"org.apache.flink.runtime.dispatcher.StandaloneDispatcher","message":"Job
 reached terminal state
FINISHED.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":97,"threadPriority":5}
{"instant":{"epochSecond":1650920968,"nanoOfSecond":3200},"thread":"cluster-io-thread-1","level":"INFO","loggerName":"org.apache.flink.runtime.jobmanager.DefaultJobGraphStore","message":"Removed
job graph  from
KubernetesStateHandleStore{configMapName='CONFIGMAP_NAME'}.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":82,"threadPriority":5}
{"instant":{"epochSecond":1650920968,"nanoOfSecond":3500},"thread":"flink-akka.actor.default-dispatcher-13","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Stopping
the JobMaster for job
'JOB_NAME' 

Re: Unit testing PyFlink SQL project

2022-04-25 Thread John Tipper
Hi Dian,

I've tried this and it works nicely, on both MacOS and Windows, thank you very 
much indeed for your help.

Kind regards,

John

From: Dian Fu 
Sent: 25 April 2022 02:42
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: Unit testing PyFlink SQL project

Hi John,

I'm also using MacOS. This is the steps I'm following which I have run 
successfully:
1) python3 -m venv .venv
2) source .venv/bin/activate
3) pip install apache-flink==1.14.4
4) python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
It will print something like this: 
"/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/log"
5) check the structure of the installed package:
```
(.venv) (base) dianfu@B-7174MD6R-1908 testing % ls -lh 
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/
total 136
-rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 README.txt
-rw-r--r--   1 dianfu  staff   1.9K Apr 25 09:26 __init__.py
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 __pycache__
drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 bin
drwxr-xr-x  21 dianfu  staff   672B Apr 25 09:26 common
drwxr-xr-x  13 dianfu  staff   416B Apr 25 09:26 conf
drwxr-xr-x  20 dianfu  staff   640B Apr 25 09:26 datastream
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 examples
-rw-r--r--   1 dianfu  staff   3.2K Apr 25 09:26 find_flink_home.py
drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 fn_execution
-rw-r--r--   1 dianfu  staff   9.1K Apr 25 09:26 gen_protos.py
-rw-r--r--   1 dianfu  staff   7.6K Apr 25 09:26 java_gateway.py
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 lib
drwxr-xr-x  28 dianfu  staff   896B Apr 25 09:26 licenses
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 log
drwxr-xr-x   5 dianfu  staff   160B Apr 25 09:26 metrics
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 opt
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 plugins
-rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 pyflink_callback_server.py
-rw-r--r--   1 dianfu  staff12K Apr 25 09:26 pyflink_gateway_server.py
-rw-r--r--   1 dianfu  staff   5.3K Apr 25 09:26 serializers.py
-rw-r--r--   1 dianfu  staff   7.9K Apr 25 09:26 shell.py
drwxr-xr-x  31 dianfu  staff   992B Apr 25 09:26 table
drwxr-xr-x   6 dianfu  staff   192B Apr 25 09:26 util
-rw-r--r--   1 dianfu  staff   1.1K Apr 25 09:26 version.py
```
6) Execute command `python3 -m unittest 
test_table_api.TableTests.test_scalar_function`
The output is as following and you could see that it executes successfully:
```
(.venv) (base) dianfu@B-7174MD6R-1908 testing % python3 -m unittest 
test_table_api.TableTests.test_scalar_function
Using %s as FLINK_HOME... 
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink
Skipped download 
/Users/dianfu/code/src/github/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.
/Users/dianfu/miniconda3/lib/python3.8/subprocess.py:946: ResourceWarning: 
subprocess 71018 is still running
  _warn("subprocess %s is still running" % self.pid,
ResourceWarning: Enable tracemalloc to get the object allocation traceback
Downloading jar org.apache.flink:flink-table-planner_2.11:1.14.4:jar:tests
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/table/table_environment.py:538:
 DeprecationWarning: Deprecated in 1.10. Use create_table instead.
  warnings.warn("Deprecated in 1.10. Use create_table instead.", 
DeprecationWarning)
.
--
Ran 1 test in 32.746s

OK
```

I have also tried your commands and run into the same error. I believe the 
difference comes from `python setup.py install` vs `pip install 
apache-flink==1.14.4`. When installing with command `python setup.py install`, 
the structure of the installed package is a little different from `pip install 
apache-flink==1.14.4`. I will dig into this and share the results when I have 
some findings.

Before that, could you try to create a new clean virtual environment and see if 
the steps I'm following work for you?

Regards,
Dian

On Mon, Apr 25, 2022 at 6:04 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
And now when I add further dependencies to the classpath to remove all 
ClassNotFound exceptions, I get a different error which I don't understand 
(TypeError: Could not found the Java class 
'EnvironmentSettings.inStreamingMode'.), see the logs below:


$ python test_table_api.py TableTests.test_scalar_function

Using %s as FLINK_HOME... 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar 
since it already 

Re: Jobmanager trying to be registered for Zombie Job

2022-04-25 Thread Peter Schrott
Hi Matthias,

You are welcome & thanks a lot for your help too!

It's not quite clear to me, the bug was already there since 1.13.6 but not
reported yet (FLINK-27354 is a new ticket)?

Best, Peter


On Mon, Apr 25, 2022 at 5:48 PM Matthias Pohl 
wrote:

> Thanks again, Peter for sharing your logs. I looked into the issue with
> the help of Chesnay. Essentially, it's FLINK-27354 [1] that is causing this
> issue. We couldn't come up with a reason why it should have popped up just
> now with 1.15. The bug itself is already present in 1.14. You can find more
> details on the investigation in FLINK-27354 [1] itself.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-27354
>
> On Mon, Apr 25, 2022 at 2:00 PM Matthias Pohl 
> wrote:
>
>> Thanks Peter, we're looking into it...
>>
>> On Mon, Apr 25, 2022 at 11:54 AM Peter Schrott 
>> wrote:
>>
>>> Hi,
>>>
>>> sorry for the late reply. It took me quite some time to get the logs out
>>> of the system. I have attached them now.
>>>
>>> Its logs of 2 jobmanagers and 2 taskamangers. It can be seen on jm 1
>>> that the job starts crashing and recovering a few times. This happens
>>> until 2022-04-20 12:12:14,607. After that the above described behavior can
>>> be seen.
>>>
>>> I hope this helps.
>>>
>>> Best, Peter
>>>
>>> On Fri, Apr 22, 2022 at 12:06 PM Matthias Pohl 
>>> wrote:
>>>
 FYI: I created FLINK-27354 [1] to cover the issue of retrying to
 connect to the RM while shutting down the JobMaster.

 This doesn't explain your issue though, Peter. It's still unclear why
 the JobMaster is still around as stated in my previous email.

 Matthias

 [1] https://issues.apache.org/jira/browse/FLINK-27354

 On Fri, Apr 22, 2022 at 11:54 AM Matthias Pohl 
 wrote:

> Just by looking through the code, it appears that these logs could be
> produced while stopping the job. The ResourceManager sends a confirmation
> of the JobMaster being disconnected at the end back to the JobMaster. If
> the JobMaster is still around to process the request, it would try to
> reconnect (I'd consider that a bug because the JobMaster is in shutdown
> mode already and wouldn't need to re-establish the connection). This 
> method
> would have been swallowed otherwise if the JobMaster was already 
> terminated.
>
> The only explanation I can come up with right now (without having any
> logs) is that stopping the JobMaster didn't finish for some reason. For
> that it would be helpful to look at the logs to see whether there is some
> other issue that causes the JobMaster to stop entirely.
>
> On Fri, Apr 22, 2022 at 10:14 AM Matthias Pohl 
> wrote:
>
>> ...if possible it would be good to get debug rather than only info
>> logs. Did you encounter anything odd in the TaskManager logs as well.
>> Sharing those might be of value as well.
>>
>> On Fri, Apr 22, 2022 at 8:57 AM Matthias Pohl 
>> wrote:
>>
>>> Hi Peter,
>>> thanks for sharing. That doesn't sound right. May you provide the
>>> entire jobmanager logs?
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Apr 21, 2022 at 6:08 PM Peter Schrott 
>>> wrote:
>>>
 Hi Flink-Users,

 I am not sure if this does something to my cluster or not. But
 since updating to Flink 1.15 (atm rc4) I find the following logs:

 INFO: Registering job manager ab7db9ff0ebd26b3b89c3e2e56684762
 @akka.tcp://
 fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 for job
 5566648d9b1aac6c1a1b78187fd56975.

 as many times as number of parallelisms (here 10 times). These logs
 are triggered every 5 minutes.

 Then they are followed by:

 INFO: Registration of job manager ab7db9ff0ebd26b3b89c3e2e56684762
 @akka.tcp://
 fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 failed.

 also 10 log entries.

 I followed the lifetime of the job (
 5566648d9b1aac6c1a1b78187fd56975), it was a long-running sql
 streaming job, started on Apr 13th on a standalone cluster. After some
 recovery attempts it finally failed (using the failover strategy) on 
 the
 20th Apr (yesterday) for good. Then those logs started to appear. Now 
 there
 was no other job running on my cluster anymore but the logs appeared 
 every
 5 minutes until I restarted this jobmanager service.

 This job was just an example, it happens to other jobs too.

 It's just INFO logs but it does not look healthy either.

 Thanks & Best
 Peter

>>>
>
> --
>
> Matthias Pohl | Engineer
>
> Follow us @VervericaData Ververica 
>
> --
>
> Join Flink Forward 

Re: Jobmanager trying to be registered for Zombie Job

2022-04-25 Thread Matthias Pohl
Thanks again, Peter for sharing your logs. I looked into the issue with the
help of Chesnay. Essentially, it's FLINK-27354 [1] that is causing this
issue. We couldn't come up with a reason why it should have popped up just
now with 1.15. The bug itself is already present in 1.14. You can find more
details on the investigation in FLINK-27354 [1] itself.

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-27354

On Mon, Apr 25, 2022 at 2:00 PM Matthias Pohl 
wrote:

> Thanks Peter, we're looking into it...
>
> On Mon, Apr 25, 2022 at 11:54 AM Peter Schrott 
> wrote:
>
>> Hi,
>>
>> sorry for the late reply. It took me quite some time to get the logs out
>> of the system. I have attached them now.
>>
>> Its logs of 2 jobmanagers and 2 taskamangers. It can be seen on jm 1 that
>> the job starts crashing and recovering a few times. This happens
>> until 2022-04-20 12:12:14,607. After that the above described behavior can
>> be seen.
>>
>> I hope this helps.
>>
>> Best, Peter
>>
>> On Fri, Apr 22, 2022 at 12:06 PM Matthias Pohl 
>> wrote:
>>
>>> FYI: I created FLINK-27354 [1] to cover the issue of retrying to connect
>>> to the RM while shutting down the JobMaster.
>>>
>>> This doesn't explain your issue though, Peter. It's still unclear why
>>> the JobMaster is still around as stated in my previous email.
>>>
>>> Matthias
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-27354
>>>
>>> On Fri, Apr 22, 2022 at 11:54 AM Matthias Pohl 
>>> wrote:
>>>
 Just by looking through the code, it appears that these logs could be
 produced while stopping the job. The ResourceManager sends a confirmation
 of the JobMaster being disconnected at the end back to the JobMaster. If
 the JobMaster is still around to process the request, it would try to
 reconnect (I'd consider that a bug because the JobMaster is in shutdown
 mode already and wouldn't need to re-establish the connection). This method
 would have been swallowed otherwise if the JobMaster was already 
 terminated.

 The only explanation I can come up with right now (without having any
 logs) is that stopping the JobMaster didn't finish for some reason. For
 that it would be helpful to look at the logs to see whether there is some
 other issue that causes the JobMaster to stop entirely.

 On Fri, Apr 22, 2022 at 10:14 AM Matthias Pohl 
 wrote:

> ...if possible it would be good to get debug rather than only info
> logs. Did you encounter anything odd in the TaskManager logs as well.
> Sharing those might be of value as well.
>
> On Fri, Apr 22, 2022 at 8:57 AM Matthias Pohl 
> wrote:
>
>> Hi Peter,
>> thanks for sharing. That doesn't sound right. May you provide the
>> entire jobmanager logs?
>>
>> Best,
>> Matthias
>>
>> On Thu, Apr 21, 2022 at 6:08 PM Peter Schrott 
>> wrote:
>>
>>> Hi Flink-Users,
>>>
>>> I am not sure if this does something to my cluster or not. But since
>>> updating to Flink 1.15 (atm rc4) I find the following logs:
>>>
>>> INFO: Registering job manager ab7db9ff0ebd26b3b89c3e2e56684762
>>> @akka.tcp://
>>> fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 for job
>>> 5566648d9b1aac6c1a1b78187fd56975.
>>>
>>> as many times as number of parallelisms (here 10 times). These logs
>>> are triggered every 5 minutes.
>>>
>>> Then they are followed by:
>>>
>>> INFO: Registration of job manager ab7db9ff0ebd26b3b89c3e2e56684762
>>> @akka.tcp://
>>> fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 failed.
>>>
>>> also 10 log entries.
>>>
>>> I followed the lifetime of the job (5566648d9b1aac6c1a1b78187fd56975),
>>> it was a long-running sql streaming job, started on Apr 13th on a
>>> standalone cluster. After some recovery attempts it finally failed 
>>> (using
>>> the failover strategy) on the 20th Apr (yesterday) for good. Then those
>>> logs started to appear. Now there was no other job running on my cluster
>>> anymore but the logs appeared every 5 minutes until I restarted this
>>> jobmanager service.
>>>
>>> This job was just an example, it happens to other jobs too.
>>>
>>> It's just INFO logs but it does not look healthy either.
>>>
>>> Thanks & Best
>>> Peter
>>>
>>

-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica 

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


Re: OOM errors cause by the new KafkaSink API

2022-04-25 Thread Hua Wei Chen
Hi Huweihua,

Thanks for the reply. Yes, we increased memory first.
But we are still curious about the memory increasing with the new Kafka
APIs/Serilizers.


On Mon, Apr 25, 2022 at 8:38 PM huweihua  wrote:

> Hi,
>
> You can try to increase the memory of TaskManager.
> If there is persistent OOM, you can dump the memory and check which part
> is taking up memory.
>
>
> 2022年4月25日 上午11:44,Hua Wei Chen  写道:
>
> Hi all,
>
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
> Our Kafka settings are not changed*[4]*.
>
> The services are very stable before migration. However, we get OOM errors
> *[5]* after the APIs migration.
>
> Does anyone encounter the same issue? Or anyone can give us suggestions
> about the settings?
>
> Many Thanks!
>
> [1] Kafka | Apache Flink
> 
> [2] new Kafka APIs
> ```
>
> def getKafkaSource[T: TypeInformation](config: Config,
>topic: String,
>parallelism: Int,
>uid: String,
>env: StreamExecutionEnvironment,
>deserializer: 
> DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
> config.getString("kafka.group.id"))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> config.getString("kafka.session.timeout.ms"))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> config.getString("kafka.receive.buffer.bytes"))
>
>   
> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>  "360")
>
>   val source = KafkaSource.builder[T]()
> .setProperties(properties)
> .setTopics(topic)
> .setValueOnlyDeserializer(deserializer)
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build()
>
>   env
> .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
> .uid(uid)
> .setParallelism(math.min(parallelism, env.getParallelism))
> .setMaxParallelism(parallelism)
> }
>
> def getKafkaSink[T: TypeInformation](config: Config,
>  serializer: 
> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
> config.getString("kafka.linger.ms"))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
> config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
> config.getString("kafka.compression.type"))
>
>   KafkaSink.builder[T]()
> .setKafkaProducerConfig(properties)
> .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
> .setRecordSerializer(serializer)
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .build()
> }
>
> ```
> [3] New Serializer
>
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
>
> class UserSTStateSerializer(topic: String) extends 
> KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: 
> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
> new ProducerRecord(topic, 
> element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
>
> [4] Kafka Settings
>
> # Common
> retries = "15"
> retry.backoff.ms = "500"
> reconnect.backoff.ms = "1000"
>
> # Producer
> linger.ms = "5"
> batch.size = "1048576"
> compression.type = "gzip"
>
> # Consumer
> group.id = ""
> session.timeout.ms = "10"
> receive.buffer.bytes = "8388608"
>
> [5] *Error Message*
> ```
> java.lang.OutOfMemoryError
>
>   at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>   at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>   at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
> Source)
>   at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>   at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>   at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>  Source)
>   at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>   at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>   at 

RE: AvroRowDeserializationSchema

2022-04-25 Thread lan tran
Hi Dian, Thank again for fast response. As your suggestion above, we can apply to set the UID for only for the DataStream state (as you suggest to convert from table to data stream). However, at the first phase which is collecting the data from Kafka ( having Debezium format), the UID cannot be set since we are using Table API (auto generate the UID). Therefore, if there is some crashed or needed revert using SavePoint, we cannot use it in the first phase since we cannot set the UID for this => so how can we revert it ?. As a result of that, we want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job to be able to use the Savepoint for the whole full flow.Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 7:46 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,You could try the following code (also it may be a little hacky):```def set_uid_for_source(ds: DataStream, uid: str):transformation = ds._j_data_stream.getTransformation() source_transformation = transformationwhile not source_transformation.getInputs().isEmpty():source_transformation = source_transformation.getInputs().get(0) source_transformation.setUid(uid)```Besides, could you describe your use case a bit and also how you want to use DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for the sources with these formats, it will send UPDATE messages to downstream operators. RegardsDian On Mon, Apr 25, 2022 at 12:31 PM lan tran  wrote:Yeah, I already tried that way. However, if we did not use DataStream at first. We cannot implement the Savepoint since through the doc if we use TableAPI (SQL API), the uid is generated automatically which means we cannot revert if the system is crashed. Best,Quynh Sent from Mail for Windows From: Dian FuSent: Monday, April 25, 2022 11:04 AMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema are still not supported in Python DataStream API. Just take a further look at the Java implementation of DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema, the results type is RowData instead of Row and so it should be not that easy to be directly supported in Python DataStream API. However, it supports conversion between Table API & DataStream API[1]. Could you firstly create a Table which consumes data from kafka and then convert it to a DataStream API?Regards,Dian[1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:Yes, we should support them. For now, if you want to use them, you could create ones in your own project. You could refer to AvroRowDeserializationSchema[1] as an example. It should not be complicated as it's simply a wrapper of the Java implementation.Regards,Dian[1] https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308 On Mon, Apr 25, 2022 at 11:27 AM lan tran  wrote:Thank Dian !! Very appreciate this.However, I have another questions related to this. In current version or any updating in future, does DataStream support DebeziumAvroRowDeserializationSchema and DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the documentation and seem it is not supported yet.Best,QuynhSent from Mail for Windows From: Dian FuSent: Friday, April 22, 2022 9:36 PMTo: lan tranCc: user@flink.apache.orgSubject: Re: AvroRowDeserializationSchema Hi Quynh,I have added an example on how to use AvroRowDeserializationSchema in Python DataStream API in [1]. Please take a look at if that helps for you~Regards,Dian[1] https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py On Fri, Apr 22, 2022 at 7:24 PM Dian Fu  wrote:Hi Quynh,Could you show some sample code on how you use it?Regards,Dian On Fri, Apr 22, 2022 at 1:42 PM lan tran  wrote:Wonder if this is a bug or not but if I use AvroRowDeserializationSchema,In PyFlink the error still occure ?py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.formats.avro.AvroRowDeserializationSchema([class org.apache.avro.Schema$RecordSchema]) does not existTherefore, please help check. ThanksBest,Quynh  Sent from Mail for Windows From: lan tranSent: Thursday, April 21, 2022 1:43 PMTo: user@flink.apache.orgSubject: AvroRowDeserializationSchema Hi team, I want to implement AvroRowDeserializationSchema when consume data from Kafka, however 

Re: AvroRowDeserializationSchema

2022-04-25 Thread Dian Fu
Hi Quynh,

You could try the following code (also it may be a little hacky):
```
def set_uid_for_source(ds: DataStream, uid: str):
transformation = ds._j_data_stream.getTransformation()

source_transformation = transformation
while not source_transformation.getInputs().isEmpty():
source_transformation = source_transformation.getInputs().get(0)

source_transformation.setUid(uid)
```

Besides, could you describe your use case a bit and also how you want to
use DebeziumAvroRowDeserializationSchema and
DebeziumJsonRowDeserializationSchema in the DataStream job? Note that for
the sources with these formats, it will send UPDATE messages to downstream
operators.

Regards
Dian

On Mon, Apr 25, 2022 at 12:31 PM lan tran  wrote:

> Yeah, I already tried that way. However, if we did not use DataStream at
> first. We cannot implement the Savepoint since through the doc if we use
> TableAPI (SQL API), the uid is generated automatically which means we
> cannot revert if the system is crashed.
>
> Best,
> Quynh
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Monday, April 25, 2022 11:04 AM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema are still not supported in
> Python DataStream API.
>
> Just take a further look at the Java implementation of
> DebeziumAvroDeserializationSchema and DebeziumJsonDeserializationSchema,
> the results type is RowData instead of Row and so it should be not that
> easy to be directly supported in Python DataStream API. However, it
> supports conversion between Table API & DataStream API[1]. Could you
> firstly create a Table which consumes data from kafka and then convert it
> to a DataStream API?
>
> Regards,
> Dian
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/datastream/intro_to_datastream_api/#create-using-table--sql-connectors
>
>
>
> On Mon, Apr 25, 2022 at 11:48 AM Dian Fu  wrote:
>
> Yes, we should support them.
>
> For now, if you want to use them, you could create ones in your own
> project. You could refer to AvroRowDeserializationSchema[1] as an example.
> It should not be complicated as it's simply a wrapper of the
> Java implementation.
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/e11a5c52c613e121f7a7868cbbfd9e7c21551394/flink-python/pyflink/common/serialization.py#L308
>
>
>
> On Mon, Apr 25, 2022 at 11:27 AM lan tran 
> wrote:
>
> Thank Dian !! Very appreciate this.
>
> However, I have another questions related to this. In current version or
> any updating in future, does DataStream support
> DebeziumAvroRowDeserializationSchema and
> DebeziumJsonRowDeserializationSchema in PyFlink ? Since I look at the
> documentation and seem it is not supported yet.
>
> Best,
> Quynh
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *Dian Fu 
> *Sent: *Friday, April 22, 2022 9:36 PM
> *To: *lan tran 
> *Cc: *user@flink.apache.org
> *Subject: *Re: AvroRowDeserializationSchema
>
>
>
> Hi Quynh,
>
> I have added an example on how to use AvroRowDeserializationSchema in
> Python DataStream API in [1]. Please take a look at if that helps for you~
>
> Regards,
> Dian
>
> [1]
> https://github.com/apache/flink/blob/release-1.15/flink-python/pyflink/examples/datastream/formats/avro_format.py
>
>
>
> On Fri, Apr 22, 2022 at 7:24 PM Dian Fu  wrote:
>
> Hi Quynh,
>
> Could you show some sample code on how you use it?
>
> Regards,
> Dian
>
>
>
> On Fri, Apr 22, 2022 at 1:42 PM lan tran  wrote:
>
> Wonder if this is a bug or not but if I use
> *AvroRowDeserializationSchema,*
>
> In PyFlink the error still occure ?
>
> py4j.protocol.Py4JError: An error occurred while calling
> None.org.apache.flink.formats.avro.AvroRowDeserializationSchema. Trace:
>
> org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
> org.apache.flink.formats.avro.AvroRowDeserializationSchema([class
> org.apache.avro.Schema$RecordSchema]) does not exist
>
> Therefore, please help check. Thanks
> Best,
> Quynh
>
>
>
>
>
> Sent from Mail  for
> Windows
>
>
>
> *From: *lan tran 
> *Sent: *Thursday, April 21, 2022 1:43 PM
> *To: *user@flink.apache.org
> *Subject: *AvroRowDeserializationSchema
>
>
>
> Hi team,
>
> I want to implement AvroRowDeserializationSchema when consume data from
> Kafka, however from the documentation, I did not understand what are
> avro_schema_string and record_class ? I would be great if you can give me
> the example on this (I only have the example on Java, however, I was doing
> it using PyFlink ).
>
> As my understanding avro_schema_string is schema_registry_url ? Does it
> support this
>  'debezium-avro-confluent.schema-registry.url'='{schema_registry_url}' like
> in TableAPI ?
>
> Best,
> Quynh.
>
> Sent from Mail 

Re: OOM errors cause by the new KafkaSink API

2022-04-25 Thread huweihua
Hi, 

You can try to increase the memory of TaskManager.
If there is persistent OOM, you can dump the memory and check which part is 
taking up memory.


> 2022年4月25日 上午11:44,Hua Wei Chen  写道:
> 
> Hi all,
> 
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at Flink 
> 1.15[1], we are trying to migrate the APIs to KafkaSource and KafkaSink[2]. 
> At the same time, we also modified the serilizers[3]. Our Kafka settings are 
> not changed[4].
> 
> The services are very stable before migration. However, we get OOM errors[5] 
> after the APIs migration.
> 
> Does anyone encounter the same issue? Or anyone can give us suggestions about 
> the settings?
> 
> Many Thanks!
> 
> [1] Kafka | Apache Flink 
> 
> [2] new Kafka APIs
> ```
> def getKafkaSource[T: TypeInformation](config: Config,
>topic: String,
>parallelism: Int,
>uid: String,
>env: StreamExecutionEnvironment,
>deserializer: 
> DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
> config.getString("kafka.group.id "))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> config.getString("kafka.session.timeout.ms 
> "))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> config.getString("kafka.receive.buffer.bytes"))
> 
>   
> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>  "360")
> 
>   val source = KafkaSource.builder[T]()
> .setProperties(properties)
> .setTopics(topic)
> .setValueOnlyDeserializer(deserializer)
> 
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
> .build()
> 
>   env
> .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
> .uid(uid)
> .setParallelism(math.min(parallelism, env.getParallelism))
> .setMaxParallelism(parallelism)
> }
> 
> def getKafkaSink[T: TypeInformation](config: Config,
>  serializer: 
> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
> config.getString("kafka.linger.ms "))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
> config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
> config.getString("kafka.compression.type"))
> 
>   KafkaSink.builder[T]()
> .setKafkaProducerConfig(properties)
> .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
> .setRecordSerializer(serializer)
> .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
> .build()
> }
> ```
> [3] New Serializer
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
> 
> class UserSTStateSerializer(topic: String) extends 
> KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: 
> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
> new ProducerRecord(topic, 
> element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
> [4] Kafka Settings
> # Common
> retries = "15"
> retry.backoff.ms  = "500"
> reconnect.backoff.ms  = "1000"
> 
> # Producer
> linger.ms  = "5"
> batch.size = "1048576"
> compression.type = "gzip"
> 
> # Consumer
> group.id  = ""
> session.timeout.ms  = "10"
> receive.buffer.bytes = "8388608"
> [5] Error Message
> ```
> java.lang.OutOfMemoryError
>   at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>   at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>   at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
> Source)
>   at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>   at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>   at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>  Source)
>   at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>   at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>   at 

Re: 安装helm的Flink Kubernetes Operator 失败

2022-04-25 Thread Biao Geng
Hi,
报错看着是没有找到cert-manager,你有参考官网的QuickStart
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
 先运行 kubectl create -f
https://github.com/jetstack/cert-manager/releases/download/v1.7.1/cert-manager.yaml
吗? 或者不需要开启webhoook的话,也可以helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator --set webhook.create=false
跳过安装webhook及其依赖的cert-manager.

Best,
Biao Geng


陈卓宇 <2572805...@qq.com.invalid> 于2022年4月25日周一 19:46写道:

> 报错:
> [streamx@localhost ~]$ helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
> WARNING: Kubernetes configuration file is group-readable. This is
> insecure. Location: /home/streamx/.kube/config
> WARNING: Kubernetes configuration file is world-readable. This is
> insecure. Location: /home/streamx/.kube/config
> Error: INSTALLATION FAILED: unable to build kubernetes objects from
> release manifest: [unable to recognize "": no matches for kind
> "Certificate" in version "cert-manager.io/v1", unable to recognize "": no
> matches for kind "Issuer" in version "cert-manager.io/v1"]
>
> 求大佬教解决办法
>
> 陈卓宇
>
>
> 


Re: Jobmanager trying to be registered for Zombie Job

2022-04-25 Thread Matthias Pohl
Thanks Peter, we're looking into it...

On Mon, Apr 25, 2022 at 11:54 AM Peter Schrott 
wrote:

> Hi,
>
> sorry for the late reply. It took me quite some time to get the logs out
> of the system. I have attached them now.
>
> Its logs of 2 jobmanagers and 2 taskamangers. It can be seen on jm 1 that
> the job starts crashing and recovering a few times. This happens
> until 2022-04-20 12:12:14,607. After that the above described behavior can
> be seen.
>
> I hope this helps.
>
> Best, Peter
>
> On Fri, Apr 22, 2022 at 12:06 PM Matthias Pohl 
> wrote:
>
>> FYI: I created FLINK-27354 [1] to cover the issue of retrying to connect
>> to the RM while shutting down the JobMaster.
>>
>> This doesn't explain your issue though, Peter. It's still unclear why the
>> JobMaster is still around as stated in my previous email.
>>
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-27354
>>
>> On Fri, Apr 22, 2022 at 11:54 AM Matthias Pohl 
>> wrote:
>>
>>> Just by looking through the code, it appears that these logs could be
>>> produced while stopping the job. The ResourceManager sends a confirmation
>>> of the JobMaster being disconnected at the end back to the JobMaster. If
>>> the JobMaster is still around to process the request, it would try to
>>> reconnect (I'd consider that a bug because the JobMaster is in shutdown
>>> mode already and wouldn't need to re-establish the connection). This method
>>> would have been swallowed otherwise if the JobMaster was already terminated.
>>>
>>> The only explanation I can come up with right now (without having any
>>> logs) is that stopping the JobMaster didn't finish for some reason. For
>>> that it would be helpful to look at the logs to see whether there is some
>>> other issue that causes the JobMaster to stop entirely.
>>>
>>> On Fri, Apr 22, 2022 at 10:14 AM Matthias Pohl 
>>> wrote:
>>>
 ...if possible it would be good to get debug rather than only info
 logs. Did you encounter anything odd in the TaskManager logs as well.
 Sharing those might be of value as well.

 On Fri, Apr 22, 2022 at 8:57 AM Matthias Pohl 
 wrote:

> Hi Peter,
> thanks for sharing. That doesn't sound right. May you provide the
> entire jobmanager logs?
>
> Best,
> Matthias
>
> On Thu, Apr 21, 2022 at 6:08 PM Peter Schrott 
> wrote:
>
>> Hi Flink-Users,
>>
>> I am not sure if this does something to my cluster or not. But since
>> updating to Flink 1.15 (atm rc4) I find the following logs:
>>
>> INFO: Registering job manager ab7db9ff0ebd26b3b89c3e2e56684762
>> @akka.tcp://
>> fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 for job
>> 5566648d9b1aac6c1a1b78187fd56975.
>>
>> as many times as number of parallelisms (here 10 times). These logs
>> are triggered every 5 minutes.
>>
>> Then they are followed by:
>>
>> INFO: Registration of job manager ab7db9ff0ebd26b3b89c3e2e56684762
>> @akka.tcp://
>> fl...@flink-jobmanager-xxx.com:40015/user/rpc/jobmanager_2 failed.
>>
>> also 10 log entries.
>>
>> I followed the lifetime of the job (5566648d9b1aac6c1a1b78187fd56975),
>> it was a long-running sql streaming job, started on Apr 13th on a
>> standalone cluster. After some recovery attempts it finally failed (using
>> the failover strategy) on the 20th Apr (yesterday) for good. Then those
>> logs started to appear. Now there was no other job running on my cluster
>> anymore but the logs appeared every 5 minutes until I restarted this
>> jobmanager service.
>>
>> This job was just an example, it happens to other jobs too.
>>
>> It's just INFO logs but it does not look healthy either.
>>
>> Thanks & Best
>> Peter
>>
>


????helm??Flink Kubernetes Operator ????

2022-04-25 Thread ??????
??
[streamx@localhost ~]$ helm install flink-kubernetes-operator 
flink-operator-repo/flink-kubernetes-operator
WARNING: Kubernetes configuration file is group-readable. This is insecure. 
Location: /home/streamx/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. 
Location: /home/streamx/.kube/config
Error: INSTALLATION FAILED: unable to build kubernetes objects from release 
manifest: [unable to recognize "": no matches for kind "Certificate" in version 
"cert-manager.io/v1", unable to recognize "": no matches for kind "Issuer" in 
version "cert-manager.io/v1"]



??




Re: how to setup working dir in Flink operator

2022-04-25 Thread 陳昌倬
On Mon, Apr 25, 2022 at 05:15:58PM +0800, Yang Wang wrote:
> Using the pod template to configure the local SSD(via host-path or local
> PV) is the correct way.
> After that, either "java.io.tmpdir" or "process.taskmanager.working-dir" in
> CR should take effect.
> 
> Maybe you need to share the complete pod yaml and logs of failed
> TaskManager.

In our test result, `process.working-dir`, or `process.taskmanager.working-dir`
seem to be ignored. Only `io.tmps.dir` can change how Flink stores
`flink-io-*`, `localState/`, etc.


The following is our test environment:

- configuration:
io.tmps.dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /srv/working-dir

- configuration:
process.working-dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /tmp

- configuration:
process.taskmanager.working-dir: /srv/working-dir
  result:
flink-io-*, localState/ are in /tmp

all other configuration are the same.


> nit: if the TaskManager pod crashed and was deleted too fast, you could
> kill the JobManager first, then you will have enough time to get the logs
> and yamls.

Thanks for the tip.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


flink ????????k8s????????jar??????????

2022-04-25 Thread ????????
flink??kubernetes session 
??jarjar??flink/libjarjar??flink/lib??,
 ??!

Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-25 Thread Yang Wang
* 使用flink run命令来提交任务到running的Session集群的话,只能是本地的jar

* 也可以使用rest接口来提交,先上传到JobManager端[1],然后运行上传的jar[2]

* 最后可以尝试一下flink-kubernetes-operator项目,目前Session job是支持远程jar的[3],项目还在不断完善

[1].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-upload
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/#jars-jarid-run
[3].
https://github.com/apache/flink-kubernetes-operator/blob/main/e2e-tests/data/sessionjob-cr.yaml

Best,
Yang

天道酬勤 <1262420...@qq.com.invalid> 于2022年4月25日周一 16:51写道:

> 我的flink是通过kubernetes session 模式部署
> ,在提交任务的时候希望可以动态指定第三方jar包来运行自己的任务,目前在官网中未找到可用的配置项。
> 希望大家能给我一些建议!


Re: flink 任务对接k8s的第三方jar包管理问题

2022-04-25 Thread zns
https://blog.csdn.net/ifenggege/article/details/113731793 

这个yarn的供参考

> 2022年4月25日 16:50,天道酬勤 <1262420...@qq.com.INVALID> 写道:
> 
> 我的flink是通过kubernetes session 模式部署 
> ,在提交任务的时候希望可以动态指定第三方jar包来运行自己的任务,目前在官网中未找到可用的配置项。
> 希望大家能给我一些建议!



Re: how to setup working dir in Flink operator

2022-04-25 Thread Yang Wang
Using the pod template to configure the local SSD(via host-path or local
PV) is the correct way.
After that, either "java.io.tmpdir" or "process.taskmanager.working-dir" in
CR should take effect.

Maybe you need to share the complete pod yaml and logs of failed
TaskManager.

nit: if the TaskManager pod crashed and was deleted too fast, you could
kill the JobManager first, then you will have enough time to get the logs
and yamls.

Best,
Yang

ChangZhuo Chen (陳昌倬)  于2022年4月25日周一 10:19写道:

> Hi,
>
> We try to migrate our application from `Flink on standalone Kubernetes`
> to `Application mode on Flink operator`. However, we cannot configure to
> use local SSD for RocksDB state successful. Any through?
>
>
> Detail:
>
> In original `Flink on standalone Kubernetes`:
> - set `io.tmp.dirs` to local SSD and Flink uses local SSD for its data.
>
> In new `Application mode on Flink operator`:
> - set `io.tmp.dirs` to local SSD causes taskmanager crashloop. We are
>   still trying to get the exact error message since it disappers very
>   fast.
> - set `workingDir` in pod template does not work. Flink still uses /tmp
>   to store its data.
> - set `process.taskmanager.working-dir` does not work. Flink still uses
>   /tmp to store its data.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Flink SQL??????Java code????debug

2022-04-25 Thread zhiyezou

Ideadebug??

?????? FlinkSQL ????k8s??????????

2022-04-25 Thread ??????
?? dlink ?? FlinkSQL??
https://github.com/DataLinkDC/dlink


----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-26541

?? <1365976...@qq.com.invalid ??2022??4??24?? 14:45??

 
??KyuubiFlinkSQL??k8s(Application
 mode)??(Session 
mode)Application??jar??jobgraph??SQL??


 CREATE TABLE T (
 id INT
 nbsp;) WITH (
 'connector.type' = 'filesystem',
 'connector.path' = 'file:///tmp/tmp.csv',
 'format.type' = 'csv',
 'format.derive-schema' = 'true'
 );



 insert into T values(1);




 
insertApplicationSQL,
 
??(catalog)??SQLSQL
 
table??Executorjobgraphjobgraph??SQLk8s??per
 job??


 


flink ????????k8s????????jar??????????

2022-04-25 Thread ????????
flink??kubernetes session  
jar
??!

Re: FlinkSQL 对接k8s的提交问题

2022-04-25 Thread Yang Wang
目前Application模式确实不能支持已经生成好的JobGraph运行,我能想到一个work around的办法是就先写一个user
jar直接把JobGraph提交到local的集群里面

就像下面这样

public class JobGraphRunner {

private static final Logger LOG =
LoggerFactory.getLogger(JobGraphRunner.class);

public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);

final String restServerAddress = "http://localhost:8081;;
LOG.info("Creating RestClusterClient({})", restServerAddress);

Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
try (ClusterClient clusterClient =
new RestClusterClient<>(
flinkConfig,
flinkConfig.toMap().get("kubernetes.cluster-id"),
(c, e) -> new
StandaloneClientHAServices(restServerAddress))) {
final String jobGraphPath = params.get("jobgraph");
Preconditions.checkNotNull(jobGraphPath, "--jobgraph
should be configured.");

LOG.info("Loading jobgraph from {}", jobGraphPath);
FileInputStream fileInputStream = new FileInputStream(jobGraphPath);
ObjectInputStream objectInputStream = new
ObjectInputStream(fileInputStream);
JobGraph jobGraph = (JobGraph) objectInputStream.readObject();
objectInputStream.close();

final JobID jobID = clusterClient.submitJob(jobGraph).get();
LOG.info("Job {} is submitted successfully", jobID);
}
}
}


Best,
Yang

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?


Re: FlinkSQL 对接k8s的提交问题

2022-04-25 Thread LuNing Wang
SQL Client的Application模式现在还不支持,方案在设计中。
https://issues.apache.org/jira/browse/FLINK-26541

吕宴全 <1365976...@qq.com.invalid> 于2022年4月24日周日 14:45写道:

> 我准备使用Kyuubi对接FlinkSQL,将任务运行在k8s环境中,任务可能包括离线作业(Application
> mode)和即席查询(Session mode)。在Application模式下,从jar中构建jobgraph。SQL示例如下:
>
>
> CREATE TABLE T (
> id INT
> ) WITH (
> 'connector.type' = 'filesystem',
> 'connector.path' = 'file:///tmp/tmp.csv',
> 'format.type' = 'csv',
> 'format.derive-schema' = 'true'
> );
>
>
>
> insert into T values(1);
>
>
>
>
> insert这条语句我希望在Application模式下执行,但是我不能只把当前的这条SQL作为参数执行,
> 因为这可能会缺少上下文信息(catalog)。因此我需要记录之前的一些SQL,并在每个任务都将之前的SQL一起作为参数,这会比较麻烦。
> 另一方面,在table的Executor里其实可以构建好jobgraph,这样将jobgraph作为参数提交就不需要传递上下文SQL了,但是k8s中已经没有per
> job这种模式的实现了。
>
>
> 我对这种场景下提交离线任务感到困惑,希望大家能给我一些建议,或者说我的理解是不是存在问题的?


Re: Write to Aliyun OSS via FileSystem connector hang Job Master on Finishing

2022-04-25 Thread Yi Tang
Thanks Guowei.

Good to know it may have been fixed in another way. I'll have a try.
BTW, which version is this kind of sink supported from? Does it also work
for batch mode?



On Mon, Apr 25, 2022 at 2:42 PM Guowei Ma  wrote:

> Hi
>
> Afaik the commit files action happens at the committer operator instead of
> the JM size after the new sink api [1].
>
> It means this would not happen if you use the new `FlinkSink`.[2]
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API
>
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
>
> Best,
> Guowei
>
>
> On Sun, Apr 24, 2022 at 11:39 AM Yi Tang  wrote:
>
>>
>>
>> -- Forwarded message -
>> From: Yi Tang 
>> Date: Sun, Apr 24, 2022 at 11:29 AM
>> Subject: Write to Aliyun OSS via FileSystem connector hang Job Master on
>> Finishing
>> To: 
>>
>>
>> Hi team;
>>
>> I'm trying to write to aliyun oss via FileSystem connector. The job
>> master always hangs on finishing.
>>
>> Looks like it is because the FileSystem connector commits the files by
>> #finalizeGlobal while the Job is finishing, which includes some rename
>> operations. However, the aliyun oss FileSystem renames files by copying,
>> which seems expensive.
>>
>> Any suggestions about this scenario?
>>
>> Thanks and regards.
>>
>>


Re: Write to Aliyun OSS via FileSystem connector hang Job Master on Finishing

2022-04-25 Thread Guowei Ma
Hi

Afaik the commit files action happens at the committer operator instead of
the JM size after the new sink api [1].

It means this would not happen if you use the new `FlinkSink`.[2]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API

[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java

Best,
Guowei


On Sun, Apr 24, 2022 at 11:39 AM Yi Tang  wrote:

>
>
> -- Forwarded message -
> From: Yi Tang 
> Date: Sun, Apr 24, 2022 at 11:29 AM
> Subject: Write to Aliyun OSS via FileSystem connector hang Job Master on
> Finishing
> To: 
>
>
> Hi team;
>
> I'm trying to write to aliyun oss via FileSystem connector. The job master
> always hangs on finishing.
>
> Looks like it is because the FileSystem connector commits the files by
> #finalizeGlobal while the Job is finishing, which includes some rename
> operations. However, the aliyun oss FileSystem renames files by copying,
> which seems expensive.
>
> Any suggestions about this scenario?
>
> Thanks and regards.
>
>


Re: [State Processor API] unable to load the state of a trigger attached to a session window

2022-04-25 Thread Dongwon Kim
Can anyone help me with this?

Thanks in advance,

On Tue, Apr 19, 2022 at 4:28 PM Dongwon Kim  wrote:

> Hi,
>
> I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the
> state of a stateful trigger attached to a session window.
> I found that the following data become available in WindowReaderFunction:
> - the state defined in the ProcessWindowFunction
> - the registered timers of the stateful trigger attached to the session
> window
> - all the elements of the window
> , but the state of the stateful trigger attached to the session window is
> not available when using State Processor API.
> In other words, the following code always returns null when used with
> session windows:
>
>> ReducingState state =
>> context.triggerState(triggerCountDesc);
>> Long val = state.get();
>>
> On the other hand, the above code snippet returns expected data when used
> with sliding and tumbling windows.
>
> To explain the problem, I made up an example in a similar spirit to
> o.a.f.state.api.SavepointWindowReaderITCase.
> Here you can find three test cases each with different types of event-time
> windows: Session, Sliding, and Tumbling.
> With sliding and tumbling windows, I can read the state of the trigger
> attached to the windows in WindowReaderFunction.
> However, with a session window, I cannot read the state of the trigger in
> WindowReaderFunction.
>
> Is it a bug, or did I miss something?
>
> Best,
>
> Dongwon
>
>