Re: Handling non-transient exceptions

2022-04-19 Thread Guowei Ma
Hi Jose

In some scenarios I think it would make sense to optionally allow the job
going on even if there are some exceptions.

But IMHO the scenario might be more likely to debug something but the
production. And in my own limited experience most of user actually could do
this themself i.e. Dealing with the exception when using `MapFunction`,
which only the developer could give the default "value(s)" to the  system.


So I would like to leave it outside the Flink.  But please correct me if  I
miss something.

Best,
Guowei


On Mon, Apr 18, 2022 at 5:54 PM Jose Brandao  wrote:

> Hello,
>
>
>
> Thank you for your answer. Yes, we are using the DataStream API.
>
>
> I agree that exceptions are developer’s responsibility but errors can
> still happen and I would like to have a progressive approach in case they
> happen instead of a blocking one.
>
>
>
> I will take a look at your suggestion. Wouldn’t it make sense to
> optionally allowing to move into the next message in case of an unpredicted
> exception happens instead of only killing the tasks and wait for a restart?
> I know that in some cases those exceptions might cause irreparable damage
> to applications but it could be configured per exception.
>
>
>
>
>
> Regards,
>
> José Brandão
>
>
>
> *From: *Guowei Ma 
> *Date: *Friday, 15 April 2022 at 11:04
> *To: *Jose Brandao 
> *Cc: *user@flink.apache.org 
> *Subject: *Re: Handling non-transient exceptions
>
>
>
>
>
> *EXTERNAL SENDER: This email originated from outside our email system. If
> you find this email suspicious please use the Report Phishing button in
> your Outlook to flag this to the Security Operations team.  *
>
>
>
>
>
>
>
> Hi, Jose
>
>
>
> I assume you are using the DataStream API.
>
>
>
> In general for any udf's exception in the DataStream job, only the
> developer of the DataStream job knows whether the exception can be
> tolerated. Because in some cases, tolerating exceptions can cause errors in
> the final result. So you still have to handle each udf exception yourself.
>
>
>
> However, there are indeed some points that can be optimized:
>
> 1. If you do have a lot of DataStream jobs, you can use some Java Lambda
> tricks to simplify these things, which may make the whole process  easier.
> For example, you can define a
> `sideOutputTheElementCausedTheException(processFunctionX, ...other
> parameters) ` in this function, once ProcessFunctionX throws any exception
> you output the exception Element to a SideOutput.
>
> 2. As for the differences in the types you mentioned, I tend to normalize
> them all into a json or use  avro format.
>
>
>
> But I think it is not easy work to replay all the exception elements.  It
> is only easy to do the replay with the source element.
>
>
> Best,
>
> Guowei
>
>
>
>
>
> On Fri, Apr 15, 2022 at 12:33 AM Jose Brandao 
> wrote:
>
> Hello,
>
> Searching some expertise on exception handling with checkpointing and
> streaming.  Let’s say some bad data flows into your Flink application and
> causes an exception you are not expecting. That exception will bubble up,
> ending up in killing the respective task and the app will not be able to
> progress. Eventually the topology will restart (if configured so) from the
> previous successful checkpoint/savepoint and will hit that broken message
> again, resulting in a loop.
>
>
>
> If we don’t know how to process a given message we would like our topology
> to progress and sink that message into some sort of dead-letter kafka
> topic.
>
>
>
> We have seen some recommendation on using Side Outputs
> 
>  for
> that but it looks like things can easily get messy with that. We would need
> to extend all our operators with try-catch blocks and side output messages
> within the catch. Then we would need to aggregate all those side outputs
> and decide what to do with them. If we want to output exactly the inbound
> message that originated the exception it requires some extra logic as well
> since our operators have different output types. On top of that it looks
> like the type of operators which allow side outputs is limited.
> https://stackoverflow.com/questions/52411705/flink-whats-the-best-way-to-handle-exceptions-inside-flink-jobs
>
>
>
> Wondering if there is a better way to do it? We would like to avoid our
> topology to be *stuck* because one message originates some unpredicted
> exception and we would also like to have as well the possibility to
> *replay* it once we put a fix in place, hence the dead letter topic idea.
>
>
>
> Regards,
>
> José Brandão
>
>
>
>
>
>
>
>


Re: How to debug Metaspace exception?

2022-04-19 Thread John Smith
Ok, so I loaded the dump into Eclipse Mat and followed:
https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

- On the Histogram, I got over 30 entries for: ChildFirstClassLoader
- Then I clicked on one of them "Merge Shortest Path..." and picked
"Exclude all phantom/weak/soft references"
- Which then gave me: SqlDriverManager > Apache Ignite JdbcThin Driver

So i'm guessing anything JDBC based. I should copy into the task manager
libs folder and my jobs make the dependencies as compile only?

On Tue, Apr 19, 2022 at 12:18 PM Yaroslav Tkachenko 
wrote:

> Also https://shopify.engineering/optimizing-apache-flink-applications-tips
> might be helpful (has a section on profiling, as well as classloading).
>
> On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler 
> wrote:
>
>> We have a very rough "guide" in the wiki (it's just the specific steps I
>> took to debug another leak):
>>
>> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>>
>> On 19/04/2022 12:01, huweihua wrote:
>>
>> Hi, John
>>
>> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
>> Check whether have too many loaded classes.
>>
>> [1] https://www.eclipse.org/mat/
>>
>> 2022年4月18日 下午9:55,John Smith  写道:
>>
>> Hi, can anyone help with this? I never looked at a dump file before.
>>
>> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
>> wrote:
>>
>>> Hi, so I have a dump file. What do I look for?
>>>
>>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>>> wrote:
>>>
 Ok so if there's a leak, if I manually stop the job and restart it from
 the UI multiple times, I won't see the issue because because the classes
 are unloaded correctly?


 On Thu, Mar 31, 2022 at 9:20 AM huweihua 
 wrote:

>
> The difference is that manually canceling the job stops the JobMaster,
> but automatic failover keeps the JobMaster running. But looking on
> TaskManager, it doesn't make much difference
>
>
> 2022年3月31日 上午4:01,John Smith  写道:
>
> Also if I manually cancel and restart the same job over and over is it
> the same as if flink was restarting a job due to failure?
>
> I.e: When I click "Cancel Job" on the UI is the job completely
> unloaded vs when the job scheduler restarts a job because if whatever
> reason?
>
> Lile this I'll stop and restart the job a few times or maybe I can
> trick my job to fail and have the scheduler restart it. Ok let me think
> about this...
>
> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:
>
>> So if I run the same jobs in my dev env will I still be able to see
>> the similar dump?
>>
>> I think running the same job in dev should be reproducible, maybe you
>> can have a try.
>>
>>  If not I would have to wait at a low volume time to do it on
>> production. Aldo if I recall the dump is as big as the JVM memory right 
>> so
>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>
>> Yes, JMAP will pause the JVM, the time of pause depends on the size
>> to dump. you can use "jmap -dump:live" to dump only the reachable 
>> objects,
>> this will take a brief pause
>>
>>
>>
>> 2022年3月30日 下午9:47,John Smith  写道:
>>
>> I have 3 task managers (see config below). There is total of 10 jobs
>> with 25 slots being used.
>> The jobs are 100% ETL I.e; They load Json, transform it and push it
>> to JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>
>> FOR JMAP. I know that it will pause the task manager. So if I run the
>> same jobs in my dev env will I still be able to see the similar dump? I I
>> assume so. If not I would have to wait at a low volume time to do it on
>> production. Aldo if I recall the dump is as big as the JVM memory right 
>> so
>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>
>>
>> # Operating system has 16GB total.
>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>
>> cluster.evenly-spread-out-slots: true
>>
>> taskmanager.memory.flink.size: 10240m
>> taskmanager.memory.jvm-metaspace.size: 2048m
>> taskmanager.numberOfTaskSlots: 16
>> parallelism.default: 1
>>
>> high-availability: zookeeper
>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
>> high-availability.zookeeper.quorum: ...
>> high-availability.zookeeper.path.root: /flink_1_14
>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>
>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>
>> state.backend: rocksdb
>> state.backend.incremental: true
>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>>
>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>>
>>> Hi, John
>>>
>>> Could you tell us you application 

Re: TumblingEventTimeWindows is not recognised by Flink-(version 1.14.4)

2022-04-19 Thread Dian Fu
Hi Harshit,

I guess you are using the reference code of the master instead of Flink
1.14 which is the version you are using.

TumblingEventTimeWindows is introduced in Flink 1.16 which is still not
released. However, it could be seen as an utility class and so I think you
could just copy it into your own project if you need to use it with Flink
1.14. You can refer to the PR [1] or just the code in master [2] for the
implementation of TumblingEventTimeWindows.

Regards,
Dian

[1]
https://github.com/apache/flink/pull/18957/files#diff-b20bd7951f36c43ecab0f8b38e366c9f9fe371f3e0ca7011c5172db1cfe87061
[2]
https://github.com/apache/flink/blob/6253cb143da0adb13581ed5bd1d3edce483eb8b3/flink-python/pyflink/datastream/window.py#L1129



On Tue, Apr 19, 2022 at 9:22 PM harshit.varsh...@iktara.ai <
harshit.varsh...@iktara.ai> wrote:

> Dear Team,
>
>
>
> I am new to pyflink and request for your support in issue I am facing with
> Pyflink. I am using Pyflink version 1.14.4 & using reference code from
> pyflink getting started pages.
>
>
>
> I am getting following error .
>
> ImportError: cannot import name 'TumblingEventTimeWindows' from
> 'pyflink.datastream.window'
> (C:\Users\Admin\PycharmProjects\pythonProject8\venv\lib\site-packages\pyflink\datastream\window.py)
>
>
>
>
>
> Below is my code for reference..
>
>
>
> import sys
>
>
>
> import argparse
>
> from typing import Iterable
>
>
>
> from pyflink.datastream.connectors import FileSink, OutputFileConfig,
> RollingPolicy
>
>
>
> from pyflink.common import Types, WatermarkStrategy, Time, Encoder
>
> from pyflink.common.watermark_strategy import TimestampAssigner
>
> from pyflink.datastream import StreamExecutionEnvironment,
> ProcessWindowFunction
>
> from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow
>
>
>
>
>
> class MyTimestampAssigner(TimestampAssigner):
>
> def extract_timestamp(self, value, record_timestamp) -> int:
>
> return int(value[1])
>
>
>
>
>
> class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
> TimeWindow]):
>
> def process(self,
>
> key: str,
>
> context: ProcessWindowFunction.Context[TimeWindow],
>
> elements: Iterable[tuple]) -> Iterable[tuple]:
>
> return [(key, context.window().start, context.window().end, len([e
> for e in elements]))]
>
>
>
> def clear(self, context: ProcessWindowFunction.Context) -> None:
>
> pass
>
>
>
>
>
> if __name__ == '__main__':
>
> parser = argparse.ArgumentParser()
>
> parser.add_argument(
>
> '--output',
>
> dest='output',
>
> required=False,
>
> help='Output file to write results to.')
>
>
>
> argv = sys.argv[1:]
>
> known_args, _ = parser.parse_known_args(argv)
>
> output_path = known_args.output
>
>
>
> env = StreamExecutionEnvironment.get_execution_environment()
>
> # write all the data to one file
>
> env.set_parallelism(1)
>
>
>
> # define the source
>
> data_stream = env.from_collection([
>
> ('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8),
> ('hi', 9), ('hi', 15)],
>
> type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
>
>
>
> # define the watermark strategy
>
> watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>
> .with_timestamp_assigner(MyTimestampAssigner())
>
>
>
> ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>
> .key_by(lambda x: x[0], key_type=Types.STRING()) \
>
> .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
>
> .process(CountWindowProcessFunction(),
>
>  Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
> Types.INT()]))
>
>
>
> # define the sink
>
> if output_path is not None:
>
> ds.sink_to(
>
> sink=FileSink.for_row_format(
>
> base_path=output_path,
>
> encoder=Encoder.simple_string_encoder())
>
> .with_output_file_config(
>
> OutputFileConfig.builder()
>
> .with_part_prefix("prefix")
>
> .with_part_suffix(".ext")
>
> .build())
>
> .with_rolling_policy(RollingPolicy.default_rolling_policy())
>
> .build()
>
> )
>
> else:
>
> print("Printing result to stdout. Use --output to specify output
> path.")
>
> ds.print()
>
>
>
> # submit for execution
>
> env.execute()
>
>
>
> Thanks and Regards,
>
> Harshit
>
>
>
>
>
>
>


Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
I can look into RocksDB metrics, I need to configure Prometheus at some point 
anyway. However, going back to the original question, is there no way to gain 
more insight into this with the state processor API? You've mentioned potential 
issues (too many states, missing compaction) but, with my admittedly limited 
understanding of the way RocksDB is used in Flink, I would have thought that 
such things would be visible when using the state processor. Is there no way 
for me to "parse" those MANIFEST files with some of Flink's classes and get 
some more hints?

Regards,
Alexis.


From: Roman Khachatryan 
Sent: Tuesday, April 19, 2022 5:51 PM
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org 
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and 
> used like this:
>
> - Window list state: each call to process() executes 
> context.windowState().getListState(...).get()
> - Global map state: each call to process() executes 
> context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is 
> executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in 
> their constructors, i.e. before they are serialized to the TM, but the 
> descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing 
> size of these files might indicate that some new states are constantly being 
> created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in 
> > RocksDB? I assume the window operators save some information in the state 
> > as well.
> That's correct, window operators maintain a list of elements per window and a 
> set of timers (timestamps). These states' names should be fixed (something 
> like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that 
> > indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, 
> so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggested to load different
> > checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> > their states. To prevent the job from discarding the state, it can
> > either be stopped for some time and then restarted from the latest
> > checkpoint; or the number of retained checkpoints can be increased
> > [2]. Copying isn't 

Re: [EXT] Vertica jdbc sink error

2022-04-19 Thread Jasmin Redzepovic
Hi Martin,

Thanks for your answer. Regarding my contribution, I will for sure check the 
contributing guide and get familiar with Flink source code. I hope it will end 
up well and I will be able to write that functionality.

Best regards,
Jasmin

On 19.04.2022., at 09:39, Martijn Visser 
mailto:martijnvis...@apache.org>> wrote:


[CAUTION] This email comes from an external organization. Do not click links or 
open attachments unless you recognize the sender and know the content is safe.

Hi Jasmin,

Vertica is not implemented as a JDBC dialect, which is a requirement for Flink 
to support this. You can find an overview of the currently supported JDBC 
dialects in the documentation [1]. It would be interesting if you could 
contribute support for Vertica towards Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/


On Fri, 15 Apr 2022 at 14:27, Jasmin Redzepovic 
mailto:jasmin.redzepo...@superbet.com>> wrote:
Hello Flink community,

I am getting this error when writing data to Vertica table:

Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 
'default_catalog.default_database.VerticaSink’.
...
Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url: 
jdbc:vertica://:5433/sbtverticadb01


This is the code for creating sink table and inserting into it:

tEnv.executeSql("""
CREATE TABLE VerticaSink (
TICKET_ID STRING,
TICKET_CODE STRING,
BUSINESS_MARKET_CODE STRING,
BUSINESS_LINE_CODE STRING,
PRIMARY KEY (TICKET_ID) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:vertica://:5433/sbtverticadb01',
'table-name' = 'SBX_DE.FLINK_SINK2',
'username' = ‘username',
'password' = ‘password'
)
"”")

tEnv.executeSql("""
INSERT INTO VerticaSink
SELECT TICKET_ID, TICKET_CODE, BUSINESS_MARKET_CODE, BUSINESS_LINE_CODE from 
Transformation2
"”")

I downloaded jdbc driver for Vertica and added it into ./lib folder.
Does anyone have idea what could it be? I googled the error, but didn’t find 
anything helpful.

Thanks and best regards,
Jasmin

This email is confidential and intended solely for the use of the individual or 
entity to whom it is addressed. If you received this e-mail by mistake, please 
notify the sender immediately by e-mail and delete this e-mail from your 
system. Please be informed that if you are not the intended recipient, you 
should not disseminate, distribute, disclose, copy or use this e-mail in any 
way, the act of dissemination, distribution, disclosure, copying or taking any 
action in reliance on the contents of this information being strictly 
prohibited. This e-mail is sent by a Superbet Group company. Any views 
expressed by the sender of this email are not necessarily those of Superbet 
Group. Please note that computer viruses can be transmitted by email. You are 
advised to check this email and any attachments for the presence of viruses. 
Superbet Group cannot accept any responsibility for any viruses transmitted by 
this email and/or any attachments.



Re: how to initialize few things at task managers

2022-04-19 Thread Great Info
I am deploying as a docker on our servers, due to some restrictions I can
only pass Keystore URLs.

one option is yarn.ship-files !.  can you help me with pointing to the
sample code, and how job manager can ship this file?

download as part of job's main function and send to all task managers..
will this work?



On Mon, Apr 18, 2022 at 10:10 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> If you are using Kubernetes to deploy Flink, you could think about an
> initContainer on the TMs or a custom Docker entry point that does this
> initialization.
>
> Best,
> Austin
>
> On Mon, Apr 18, 2022 at 7:49 AM huweihua  wrote:
>
>> Hi, Init stuff when task manager comes up is not an option.
>> But if the Keystore file is not changeable and you are using yarn mode,
>> maybe you can use ‘yarn.ship-files’[1] to localize it.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/config/#yarn-ship-files
>>
>> 2022年4月16日 下午11:44,Great Info  写道:
>>
>> I need to download Keystore and use it while creating the source
>> connector, currently, I am overriding the open method
>> 
>>  but
>> this gets called for each of my source connectors.
>>
>>  @Override
>> public void open(Configuration configuration) throws Exception {
>>
>>   // do few things like download Keystore to default path etc
>>  super.open(configuration)
>> }
>>
>> Is there an option to init a few pre stuff as soon as the task manager
>> comes up?
>>
>>
>>


flink-stop command fails with ` Operation not found under key`

2022-04-19 Thread Harsh Shah
Hello Community,

We are trying to adopt flink-stop instead of orchestrating the stop
manually. While using the command, it failed with error "Operation not
found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@9fbfc15a".
the job eventually stopped but I wanted to see how I could execute the
command successfully ! Any help or direction to debug is appreciated.

Information:
* Flink version: 1.13.1
* Flink on kubernetes with HA enabled. Standalone cluster.

*command*: "/bin/flink stop $JOBID"

*Stack trace of the command:*






























*WARNING: Illegal reflective access by
org.apache.hadoop.security.authentication.util.KerberosUtil
(file:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar) to method
sun.security.krb5.Config.getInstance()WARNING: Please consider reporting
this to the maintainers of
org.apache.hadoop.security.authentication.util.KerberosUtilWARNING: Use
--illegal-access=warn to enable warnings of further illegal reflective
access operationsWARNING: All illegal access operations will be denied in a
future releaseSuspending job "" with a
savepoint. The
program finished with the following
exception:org.apache.flink.util.FlinkException: Could not stop with a
savepoint job "". at
org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:581)
at
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:569) at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1069)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.base/java.security.AccessController.doPrivileged(Native Method) at
java.base/javax.security.auth.Subject.doAs(Unknown Source) at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)Caused
by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.NotFoundException: Operation not found under
key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@9fbfc15a
at
org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:182)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler.handleRequest(SavepointHandlers.java:219)
at
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
at java.base/java.util.Optional.ifPresent(Unknown Source) at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)*


*Jobs manager logs:*

*{"instant":{"epochSecond":1650384228,"nanoOfSecond":866181000},"thread":"flink-akka.actor.default-dispatcher-2","level":"INFO","loggerName":"org.apache.flink.runtime.jobmaster.JobMaster","message":"Triggering
stop-with-savepoint for job
.","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":28,"threadPriority":5}{"instant":{"epochSecond":1650384229,"nanoOfSecond":30779000},"thread":"flink-rest-server-netty-worker-thread-1","level":"ERROR","loggerName":"org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointStatusHandler","message":"Exception
occurred in REST handler: Operation not found under key:
org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@9fbfc15a","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":95,"threadPriority":5}*
*... # rest of things are success*


Re: How to debug Metaspace exception?

2022-04-19 Thread Yaroslav Tkachenko
Also https://shopify.engineering/optimizing-apache-flink-applications-tips
might be helpful (has a section on profiling, as well as classloading).

On Tue, Apr 19, 2022 at 4:35 AM Chesnay Schepler  wrote:

> We have a very rough "guide" in the wiki (it's just the specific steps I
> took to debug another leak):
>
> https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks
>
> On 19/04/2022 12:01, huweihua wrote:
>
> Hi, John
>
> Sorry for the late reply. You can use MAT[1] to analyze the dump file.
> Check whether have too many loaded classes.
>
> [1] https://www.eclipse.org/mat/
>
> 2022年4月18日 下午9:55,John Smith  写道:
>
> Hi, can anyone help with this? I never looked at a dump file before.
>
> On Thu, Apr 14, 2022 at 11:59 AM John Smith 
> wrote:
>
>> Hi, so I have a dump file. What do I look for?
>>
>> On Thu, Mar 31, 2022 at 3:28 PM John Smith 
>> wrote:
>>
>>> Ok so if there's a leak, if I manually stop the job and restart it from
>>> the UI multiple times, I won't see the issue because because the classes
>>> are unloaded correctly?
>>>
>>>
>>> On Thu, Mar 31, 2022 at 9:20 AM huweihua  wrote:
>>>

 The difference is that manually canceling the job stops the JobMaster,
 but automatic failover keeps the JobMaster running. But looking on
 TaskManager, it doesn't make much difference


 2022年3月31日 上午4:01,John Smith  写道:

 Also if I manually cancel and restart the same job over and over is it
 the same as if flink was restarting a job due to failure?

 I.e: When I click "Cancel Job" on the UI is the job completely unloaded
 vs when the job scheduler restarts a job because if whatever reason?

 Lile this I'll stop and restart the job a few times or maybe I can
 trick my job to fail and have the scheduler restart it. Ok let me think
 about this...

 On Wed, Mar 30, 2022 at 10:24 AM 胡伟华  wrote:

> So if I run the same jobs in my dev env will I still be able to see
> the similar dump?
>
> I think running the same job in dev should be reproducible, maybe you
> can have a try.
>
>  If not I would have to wait at a low volume time to do it on
> production. Aldo if I recall the dump is as big as the JVM memory right so
> if I have 10GB configed for the JVM the dump will be 10GB file?
>
> Yes, JMAP will pause the JVM, the time of pause depends on the size to
> dump. you can use "jmap -dump:live" to dump only the reachable objects,
> this will take a brief pause
>
>
>
> 2022年3月30日 下午9:47,John Smith  写道:
>
> I have 3 task managers (see config below). There is total of 10 jobs
> with 25 slots being used.
> The jobs are 100% ETL I.e; They load Json, transform it and push it to
> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>
> FOR JMAP. I know that it will pause the task manager. So if I run the
> same jobs in my dev env will I still be able to see the similar dump? I I
> assume so. If not I would have to wait at a low volume time to do it on
> production. Aldo if I recall the dump is as big as the JVM memory right so
> if I have 10GB configed for the JVM the dump will be 10GB file?
>
>
> # Operating system has 16GB total.
> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>
> cluster.evenly-spread-out-slots: true
>
> taskmanager.memory.flink.size: 10240m
> taskmanager.memory.jvm-metaspace.size: 2048m
> taskmanager.numberOfTaskSlots: 16
> parallelism.default: 1
>
> high-availability: zookeeper
> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/
> high-availability.zookeeper.quorum: ...
> high-availability.zookeeper.path.root: /flink_1_14
> high-availability.cluster-id: /flink_1_14_cluster_0001
>
> web.upload.dir: /mnt/flink/uploads/flink_1_14
>
> state.backend: rocksdb
> state.backend.incremental: true
> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14
> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14
>
> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华  wrote:
>
>> Hi, John
>>
>> Could you tell us you application scenario? Is it a flink session
>> cluster with a lot of jobs?
>>
>> Maybe you can try to dump the memory with jmap and use tools such as
>> MAT to analyze whether there are abnormal classes and classloaders
>>
>>
>> > 2022年3月30日 上午6:09,John Smith  写道:
>> >
>> > Hi running 1.14.4
>> >
>> > My tasks manager still fails with java.lang.OutOfMemoryError:
>> Metaspace. The metaspace out-of-memory error has occurred. This can mean
>> two things: either the job requires a larger size of JVM metaspace to 
>> load
>> classes or there is a class loading leak.
>> >
>> > I have 2GB of metaspace configed
>> taskmanager.memory.jvm-metaspace.size: 2048m
>> >
>> > But the 

Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Roman Khachatryan
> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
Yes, that's what I meant, and that's regarded as the same column family.

Another possible reason is that SST files aren't being compacted and
that increases the MANIFEST file size.
I'd check the total number of loaded SST files and the creation date
of the oldest one.

You can also see whether there are any compactions running via RocksDB
metrics [1] [2] (a reporter needs to be configured [3]).

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-num-running-compactions
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-metrics-compaction-pending
[3]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#reporters

Regards,
Roman

On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa
 wrote:
>
> Hi Roman,
>
> I assume that when you say "new states", that is related to new descriptors 
> with different names? Because, in the case of windowing for example, each 
> window "instance" has its own scoped (non-global and keyed) state, but that's 
> not regarded as a separate column family, is it?
>
> For the 3 descriptors I mentioned before, they are only instantiated once and 
> used like this:
>
> - Window list state: each call to process() executes 
> context.windowState().getListState(...).get()
> - Global map state: each call to process() executes 
> context.globalState().getMapState(...)
> - Global list state: within open(), runtimeContext.getListState(...) is 
> executed once and used throughout the life of the operator.
>
> According to [1], the two ways of using global state should be equivalent.
>
> I will say that some of the operators instantiate the state descriptor in 
> their constructors, i.e. before they are serialized to the TM, but the 
> descriptors are Serializable, so I imagine that's not relevant.
>
> [1] https://stackoverflow.com/a/50510054/5793905
>
> Regards,
> Alexis.
>
> -Original Message-
> From: Roman Khachatryan 
> Sent: Dienstag, 19. April 2022 11:48
> To: Alexis Sarda-Espinosa 
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
>
> Hi Alexis,
>
> Thanks a lot for the information,
>
> MANIFEST files list RocksDB column families (among other info); ever growing 
> size of these files might indicate that some new states are constantly being 
> created.
> Could you please confirm that the number of state names is constant?
>
> > Could you confirm if Flink's own operators could be creating state in 
> > RocksDB? I assume the window operators save some information in the state 
> > as well.
> That's correct, window operators maintain a list of elements per window and a 
> set of timers (timestamps). These states' names should be fixed (something 
> like "window-contents" and "window-timers").
>
> > is that related to managed state used by my functions? Or does that 
> > indicate size growth is elsewhere?
> The same mechanism is used for both Flink internal state and operator state, 
> so it's hard to say without at least knowing the state names.
>
>
> Regards,
> Roman
>
>
> On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
> >
> > /shared folder contains keyed state that is shared among different
> > checkpoints [1]. Most of state should be shared in your case since
> > you're using keyed state and incremental checkpoints.
> >
> > When a checkpoint is loaded, the state that it shares with older
> > checkpoints is loaded as well. I suggested to load different
> > checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> > their states. To prevent the job from discarding the state, it can
> > either be stopped for some time and then restarted from the latest
> > checkpoint; or the number of retained checkpoints can be increased
> > [2]. Copying isn't necessary.
> >
> > Besides that, you can also check state sizes of operator in Flink Web
> > UI (but not the sizes of individual states). If the operators are
> > chained then their combined state size will be shown. To prevent this,
> > you can disable chaining [3] (although this will have performance
> > impact).
> >
> > Individual checkpoint folders should be eventually removed (when the
> > checkpoint is subsumed). However, this is not guaranteed: if there is
> > any problem during deletion, it will be logged, but the job will not
> > fail.
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> > eckpoints/#directory-structure
> > [2]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> > onfig/#state-checkpoints-num-retained
> > [3]
> > 

TumblingEventTimeWindows is not recognised by Flink-(version 1.14.4)

2022-04-19 Thread harshit.varsh...@iktara.ai
Dear Team,

 

I am new to pyflink and request for your support in issue I am facing with
Pyflink. I am using Pyflink version 1.14.4 & using reference code from
pyflink getting started pages. 

 

I am getting following error . 

ImportError: cannot import name 'TumblingEventTimeWindows' from
'pyflink.datastream.window'
(C:\Users\Admin\PycharmProjects\pythonProject8\venv\lib\site-packages\pyflin
k\datastream\window.py)

 

 

Below is my code for reference..

 

import sys

 

import argparse

from typing import Iterable

 

from pyflink.datastream.connectors import FileSink, OutputFileConfig,
RollingPolicy

 

from pyflink.common import Types, WatermarkStrategy, Time, Encoder

from pyflink.common.watermark_strategy import TimestampAssigner

from pyflink.datastream import StreamExecutionEnvironment,
ProcessWindowFunction

from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow

 

 

class MyTimestampAssigner(TimestampAssigner):

def extract_timestamp(self, value, record_timestamp) -> int:

return int(value[1])

 

 

class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str,
TimeWindow]):

def process(self,

key: str,

context: ProcessWindowFunction.Context[TimeWindow],

elements: Iterable[tuple]) -> Iterable[tuple]:

return [(key, context.window().start, context.window().end, len([e
for e in elements]))]

 

def clear(self, context: ProcessWindowFunction.Context) -> None:

pass

 

 

if __name__ == '__main__':

parser = argparse.ArgumentParser()

parser.add_argument(

'--output',

dest='output',

required=False,

help='Output file to write results to.')

 

argv = sys.argv[1:]

known_args, _ = parser.parse_known_args(argv)

output_path = known_args.output

 

env = StreamExecutionEnvironment.get_execution_environment()

# write all the data to one file

env.set_parallelism(1)

 

# define the source

data_stream = env.from_collection([

('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8),
('hi', 9), ('hi', 15)],

type_info=Types.TUPLE([Types.STRING(), Types.INT()]))

 

# define the watermark strategy

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \

.with_timestamp_assigner(MyTimestampAssigner())

 

ds = data_stream.assign_timestamps_and_watermarks(watermark_strategy) \

.key_by(lambda x: x[0], key_type=Types.STRING()) \

.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \

.process(CountWindowProcessFunction(),

 Types.TUPLE([Types.STRING(), Types.INT(), Types.INT(),
Types.INT()]))

 

# define the sink

if output_path is not None:

ds.sink_to(

sink=FileSink.for_row_format(

base_path=output_path,

encoder=Encoder.simple_string_encoder())

.with_output_file_config(

OutputFileConfig.builder()

.with_part_prefix("prefix")

.with_part_suffix(".ext")

.build())

.with_rolling_policy(RollingPolicy.default_rolling_policy())

.build()

)

else:

print("Printing result to stdout. Use --output to specify output
path.")

ds.print()

 

# submit for execution

env.execute()

 

Thanks and Regards,

Harshit

 

 

 



/status endpoint of flink jobmanager not working

2022-04-19 Thread Peter Schrott
Hi Flink Users,

Does anyone know what happened to the /status endpoint of a job?

Calling /jobs/0c39e6ce662379449e7f7f965ff1eee0/status gives me a 404.

Thanks & best, Peter


Flink checkpointing configuration

2022-04-19 Thread Nikola Hrusov
Hi,

I have a question regarding flink checkpointing configuration.

I am obtaining my knowledge from the official docs here:
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/
and running Flink 1.14.4

I would like to be able to do a checkpoint every 10 minutes which at least
10 minutes pause between checkpoints. Thus I have set the following
properties:

execution.checkpointing.interval: 10 min
execution.checkpointing.min-pause: 10 min


And that works for the positive scenarios where my job runs fine. However,
when we have a checkpoint timeout it seems that the min-pause is not
applied, e.g.:

t - checkpoint #1 starts
t+10min - checkpoints #1 fails due to timeout (execution.checkpointing.timeout
defaults to 10min)
t+10min - checkpoint #2 starts


I would expect (and want to achieve):

t - checkpoint #1 starts
t+10min - checkpoints #1 fails due to timeout (execution.checkpointing.timeout
defaults to 10min)
t+20min (t+10min(timeout)+10min(min-pause) - checkpoint #2 starts



I expect that because:
- the checkpoint #1 at t+10min did not succeed, but it finished at t+10min.
I expect the min-pause to start counting from there.
- if checkpoint #1 failed with timeout it's very unlikely checkpoint #2
which starts immediately after the failed checkpoint #1 to succeed


At this point I am not sure whether I do not understand the docs and how I
should configure my job.

When I set the configuration like so:

execution.checkpointing.interval: 10 min
execution.checkpointing.min-pause: 15 min


Then I get checkpoints every 15 min instead.

Can someone help me understand the docs better and configure my job? Thanks

Regards
,
Nikola


Re: HDFS streaming source concerns

2022-04-19 Thread Adrian Bednarz
Hello,

We are actually working on a similar problem against S3. The checkpointing
thing got me thinking if the checkpoint would indeed succeed with a large
backlog of files. I always imagined that SplitEnumerator lists all
available files and SourceReader is responsible for reading those files
afterwards. In this model I thought checkpoint barriers would be blocked
until the initial files backlog is fully consumed.

In Flink UI though I can see that checkpoints are happening timely when
performing a small experiment against local FS. Can somebody comment on how
this works under the hood?

Adrian

On Fri, Apr 8, 2022 at 8:20 PM Roman Khachatryan  wrote:

> Hi Carlos,
>
> AFAIK, Flink FileSource is capable of checkpointing while reading the
> files (at least in Streaming Mode).
> As for the watermarks, I think FLIP-182 [1] could solve the problem;
> however, it's currently under development.
>
> I'm also pulling in Arvid and Fabian who are more familiar with the
> subject.
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>
> Regards,
> Roman
>
> On Wed, Apr 6, 2022 at 4:17 PM Carlos Downey 
> wrote:
> >
> > Hi,
> >
> > We have an in-house platform that we want to integrate with external
> clients via HDFS. They have lots of existing files and they continuously
> put more data to HDFS. Ideally, we would like to have a Flink job that
> takes care of ingesting data as one of the requirements is to execute SQL
> on top of these files. We looked at existing FileSource implementation but
> we believe this will not be well suited for this use case.
> > - firstly, we'd have to ingest all files initially present on HDFS
> before completing first checkpoint - this is unacceptable for us as we
> would have to reprocess all the files again in case of early job failure.
> Not to mention the state blowing up for aggregations.
> > - secondly, we see now way to establish valid watermark strategy. This
> is a major pain point that we can't find the right answer for. We don't
> want to assume too much about the data itself. In general, the only
> solutions we see require some sort of synchronization across subtasks. On
> the other hand, the simplest strategy is to delay the watermark. In that
> case though we are afraid of accidentally dropping events.
> >
> > Given this, we think about implementing our own file source, have
> someone in the community already tried solving similar problem? If not, any
> suggestions about the concerns we raised would be valuable.
>


Re: New JM pod tries to connect to failed JM pod

2022-04-19 Thread huweihua
Hi,
After the previous JobManager fails, K8S start the new JobManager, but the 
Leader saved in HA is still the old JobManager address. After the Dispatcher 
gets the old JobManager leader, it will try to connect to it.

This error can be ignored, and it will return to normal after waiting for a 
period of time for the new JobManager to become the leader.

> 2022年4月19日 上午9:25,Alexey Trenikhun  写道:
> 
> Hello,
> We are running Flink 1.13.6 in Kubernetes with k8s HA, the setup includes 1 
> JM and TM.  Recently In jobmanager log I started to see:
> 
> 2022-04-19T00:11:33.102Z Association with remote system 
> [akka.tcp://flink@10.204.0.126:6123 ] has 
> failed, address is now gated for [50] ms. Reason: [Association failed with 
> [akka.tcp://flink@10.204.0.126:6123 ]] 
> Caused by: [No response from remote for outbound association. Associate timed 
> out after [2 ms].]
> 
> I suspect that root cause are some network issues. But what is very strange 
> that this log from pod gsp-jm-424--1-8v5qj (10.204.2.138) and 10.204.0.126 is 
> IP address of failed JM pod - gsp-jm-424--1-kdhqp, so looks like newer 
> instance of JM (10.204.2.138) is trying to connect to older failed instance 
> of JM (10.204.0.126). 
> 
> Thanks,
> Alexey



RE: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Alexis Sarda-Espinosa
Hi Roman,

I assume that when you say "new states", that is related to new descriptors 
with different names? Because, in the case of windowing for example, each 
window "instance" has its own scoped (non-global and keyed) state, but that's 
not regarded as a separate column family, is it?

For the 3 descriptors I mentioned before, they are only instantiated once and 
used like this:

- Window list state: each call to process() executes 
context.windowState().getListState(...).get()
- Global map state: each call to process() executes 
context.globalState().getMapState(...)
- Global list state: within open(), runtimeContext.getListState(...) is 
executed once and used throughout the life of the operator.

According to [1], the two ways of using global state should be equivalent.

I will say that some of the operators instantiate the state descriptor in their 
constructors, i.e. before they are serialized to the TM, but the descriptors 
are Serializable, so I imagine that's not relevant.

[1] https://stackoverflow.com/a/50510054/5793905

Regards,
Alexis.

-Original Message-
From: Roman Khachatryan  
Sent: Dienstag, 19. April 2022 11:48
To: Alexis Sarda-Espinosa 
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever growing 
size of these files might indicate that some new states are constantly being 
created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in 
> RocksDB? I assume the window operators save some information in the state as 
> well.
That's correct, window operators maintain a list of elements per window and a 
set of timers (timestamps). These states' names should be fixed (something like 
"window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate 
> size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator state, so 
it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
>
> /shared folder contains keyed state that is shared among different 
> checkpoints [1]. Most of state should be shared in your case since 
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older 
> checkpoints is loaded as well. I suggested to load different 
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in 
> their states. To prevent the job from discarding the state, it can 
> either be stopped for some time and then restarted from the latest 
> checkpoint; or the number of retained checkpoints can be increased 
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web 
> UI (but not the sizes of individual states). If the operators are 
> chained then their combined state size will be shown. To prevent this, 
> you can disable chaining [3] (although this will have performance 
> impact).
>
> Individual checkpoint folders should be eventually removed (when the 
> checkpoint is subsumed). However, this is not guaranteed: if there is 
> any problem during deletion, it will be logged, but the job will not 
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/ch
> eckpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastre
> am/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
>  wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> > You suggest comparing counts of objects in different checkpoints, I assume 
> > you mean copying my "checkpoints" folder at different times and comparing, 
> > not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did 
> > look at the folder in the running system, and I noticed that most of the 
> > chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> > corresponding to newer checkpoints. I would think this makes sense since 
> > the configuration specifies that only 1 completed checkpoint should be 
> > retained, but then why are the older chk-* folders still there? I did 
> > trigger a manual restart of the Flink cluster in the past (before starting 
> > the long-running test), but if my policy is to CLAIM the checkpoint, 
> > Flink's documentation states that it would be cleaned eventually.
> >
> > Moreover, just by looking at folder sizes with "du", I can see that most 

Re: How to debug Metaspace exception?

2022-04-19 Thread Chesnay Schepler
We have a very rough "guide" in the wiki (it's just the specific steps I 
took to debug another leak):

https://cwiki.apache.org/confluence/display/FLINK/Debugging+ClassLoader+leaks

On 19/04/2022 12:01, huweihua wrote:

Hi, John

Sorry for the late reply. You can use MAT[1] to analyze the dump file. 
Check whether have too many loaded classes.


[1] https://www.eclipse.org/mat/


2022年4月18日 下午9:55,John Smith  写道:

Hi, can anyone help with this? I never looked at a dump file before.

On Thu, Apr 14, 2022 at 11:59 AM John Smith  
wrote:


Hi, so I have a dump file. What do I look for?

On Thu, Mar 31, 2022 at 3:28 PM John Smith
 wrote:

Ok so if there's a leak, if I manually stop the job and
restart it from the UI multiple times, I won't see the issue
because because the classes are unloaded correctly?


On Thu, Mar 31, 2022 at 9:20 AM huweihua
 wrote:


The difference is that manually canceling the job stops
the JobMaster, but automatic failover keeps the JobMaster
running. But looking on TaskManager, it doesn't make much
difference



2022年3月31日 上午4:01,John Smith 
写道:

Also if I manually cancel and restart the same job over
and over is it the same as if flink was restarting a job
due to failure?

I.e: When I click "Cancel Job" on the UI is the job
completely unloaded vs when the job scheduler restarts a
job because if whatever reason?

Lile this I'll stop and restart the job a few times or
maybe I can trick my job to fail and have the scheduler
restart it. Ok let me think about this...

On Wed, Mar 30, 2022 at 10:24 AM 胡伟华
 wrote:


So if I run the same jobs in my dev env will I
still be able to see the similar dump?

I think running the same job in dev should be
reproducible, maybe you can have a try.


 If not I would have to wait at a low volume time
to do it on production. Aldo if I recall the dump
is as big as the JVM memory right so if I have 10GB
configed for the JVM the dump will be 10GB file?

Yes, JMAP will pause the JVM, the time of pause
depends on the size to dump. you can use "jmap
-dump:live" to dump only the reachable objects, this
will take a brief pause




2022年3月30日 下午9:47,John Smith
 写道:

I have 3 task managers (see config below). There is
total of 10 jobs with 25 slots being used.
The jobs are 100% ETL I.e; They load Json,
transform it and push it to JDBC, only 1 job of the
10 is pushing to Apache Ignite cluster.

FOR JMAP. I know that it will pause the task
manager. So if I run the same jobs in my dev env
will I still be able to see the similar dump? I I
assume so. If not I would have to wait at a low
volume time to do it on production. Aldo if I
recall the dump is as big as the JVM memory right
so if I have 10GB configed for the JVM the dump
will be 10GB file?


# Operating system has 16GB total.
env.ssh.opts: -l flink -oStrictHostKeyChecking=no

cluster.evenly-spread-out-slots: true

taskmanager.memory.flink.size: 10240m
taskmanager.memory.jvm-metaspace.size: 2048m
taskmanager.numberOfTaskSlots: 16
parallelism.default: 1

high-availability: zookeeper
high-availability.storageDir:
file:///mnt/flink/ha/flink_1_14/
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: /flink_1_14
high-availability.cluster-id: /flink_1_14_cluster_0001

web.upload.dir: /mnt/flink/uploads/flink_1_14

state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir:
file:///mnt/flink/checkpoints/flink_1_14
state.savepoints.dir:
file:///mnt/flink/savepoints/flink_1_14

On Wed, Mar 30, 2022 at 2:16 AM 胡伟华
 wrote:

Hi, John

Could you tell us you application scenario? Is
it a flink session cluster with a lot of jobs?

Maybe you can try to dump the memory with jmap
and use tools such as MAT to analyze whether
there are abnormal classes and classloaders


> 2022年3月30日 上午6:09,John Smith
 写道:
 

Re: How to debug Metaspace exception?

2022-04-19 Thread huweihua
Hi, John

Sorry for the late reply. You can use MAT[1] to analyze the dump file. Check 
whether have too many loaded classes.

[1] https://www.eclipse.org/mat/

> 2022年4月18日 下午9:55,John Smith  写道:
> 
> Hi, can anyone help with this? I never looked at a dump file before.
> 
> On Thu, Apr 14, 2022 at 11:59 AM John Smith  > wrote:
> Hi, so I have a dump file. What do I look for?
> 
> On Thu, Mar 31, 2022 at 3:28 PM John Smith  > wrote:
> Ok so if there's a leak, if I manually stop the job and restart it from the 
> UI multiple times, I won't see the issue because because the classes are 
> unloaded correctly?
> 
> 
> On Thu, Mar 31, 2022 at 9:20 AM huweihua  > wrote:
> 
> The difference is that manually canceling the job stops the JobMaster, but 
> automatic failover keeps the JobMaster running. But looking on TaskManager, 
> it doesn't make much difference
> 
> 
>> 2022年3月31日 上午4:01,John Smith > > 写道:
>> 
>> Also if I manually cancel and restart the same job over and over is it the 
>> same as if flink was restarting a job due to failure?
>> 
>> I.e: When I click "Cancel Job" on the UI is the job completely unloaded vs 
>> when the job scheduler restarts a job because if whatever reason?
>> 
>> Lile this I'll stop and restart the job a few times or maybe I can trick my 
>> job to fail and have the scheduler restart it. Ok let me think about this...
>> 
>> On Wed, Mar 30, 2022 at 10:24 AM 胡伟华 > > wrote:
>>> So if I run the same jobs in my dev env will I still be able to see the 
>>> similar dump? 
>> I think running the same job in dev should be reproducible, maybe you can 
>> have a try.
>> 
>>>  If not I would have to wait at a low volume time to do it on production. 
>>> Aldo if I recall the dump is as big as the JVM memory right so if I have 
>>> 10GB configed for the JVM the dump will be 10GB file?
>> 
>> Yes, JMAP will pause the JVM, the time of pause depends on the size to dump. 
>> you can use "jmap -dump:live" to dump only the reachable objects, this will 
>> take a brief pause
>> 
>> 
>> 
>>> 2022年3月30日 下午9:47,John Smith >> > 写道:
>>> 
>>> I have 3 task managers (see config below). There is total of 10 jobs with 
>>> 25 slots being used.
>>> The jobs are 100% ETL I.e; They load Json, transform it and push it to 
>>> JDBC, only 1 job of the 10 is pushing to Apache Ignite cluster.
>>> 
>>> FOR JMAP. I know that it will pause the task manager. So if I run the same 
>>> jobs in my dev env will I still be able to see the similar dump? I I assume 
>>> so. If not I would have to wait at a low volume time to do it on 
>>> production. Aldo if I recall the dump is as big as the JVM memory right so 
>>> if I have 10GB configed for the JVM the dump will be 10GB file?
>>> 
>>> 
>>> # Operating system has 16GB total.
>>> env.ssh.opts: -l flink -oStrictHostKeyChecking=no
>>> 
>>> cluster.evenly-spread-out-slots: true
>>> 
>>> taskmanager.memory.flink.size: 10240m
>>> taskmanager.memory.jvm-metaspace.size: 2048m
>>> taskmanager.numberOfTaskSlots: 16
>>> parallelism.default: 1
>>> 
>>> high-availability: zookeeper
>>> high-availability.storageDir: file:///mnt/flink/ha/flink_1_14/ <>
>>> high-availability.zookeeper.quorum: ...
>>> high-availability.zookeeper.path.root: /flink_1_14
>>> high-availability.cluster-id: /flink_1_14_cluster_0001
>>> 
>>> web.upload.dir: /mnt/flink/uploads/flink_1_14
>>> 
>>> state.backend: rocksdb
>>> state.backend.incremental: true
>>> state.checkpoints.dir: file:///mnt/flink/checkpoints/flink_1_14 <>
>>> state.savepoints.dir: file:///mnt/flink/savepoints/flink_1_14 <>
>>> 
>>> On Wed, Mar 30, 2022 at 2:16 AM 胡伟华 >> > wrote:
>>> Hi, John
>>> 
>>> Could you tell us you application scenario? Is it a flink session cluster 
>>> with a lot of jobs?
>>> 
>>> Maybe you can try to dump the memory with jmap and use tools such as MAT to 
>>> analyze whether there are abnormal classes and classloaders
>>> 
>>> 
>>> > 2022年3月30日 上午6:09,John Smith >> > > 写道:
>>> > 
>>> > Hi running 1.14.4
>>> > 
>>> > My tasks manager still fails with java.lang.OutOfMemoryError: Metaspace. 
>>> > The metaspace out-of-memory error has occurred. This can mean two things: 
>>> > either the job requires a larger size of JVM metaspace to load classes or 
>>> > there is a class loading leak.
>>> > 
>>> > I have 2GB of metaspace configed taskmanager.memory.jvm-metaspace.size: 
>>> > 2048m
>>> > 
>>> > But the task nodes still fail.
>>> > 
>>> > When looking at the UI metrics, the metaspace starts low. Now I see 85% 
>>> > usage. It seems to be a class loading leak at this point, how can we 
>>> > debug this issue?
>>> 
>> 
> 



Re: RocksDB's state size discrepancy with what's seen with state processor API

2022-04-19 Thread Roman Khachatryan
Hi Alexis,

Thanks a lot for the information,

MANIFEST files list RocksDB column families (among other info); ever
growing size of these files might indicate that some new states are
constantly being created.
Could you please confirm that the number of state names is constant?

> Could you confirm if Flink's own operators could be creating state in 
> RocksDB? I assume the window operators save some information in the state as 
> well.
That's correct, window operators maintain a list of elements per
window and a set of timers (timestamps). These states' names should be
fixed (something like "window-contents" and "window-timers").

> is that related to managed state used by my functions? Or does that indicate 
> size growth is elsewhere?
The same mechanism is used for both Flink internal state and operator
state, so it's hard to say without at least knowing the state names.


Regards,
Roman


On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan  wrote:
>
> /shared folder contains keyed state that is shared among different
> checkpoints [1]. Most of state should be shared in your case since
> you're using keyed state and incremental checkpoints.
>
> When a checkpoint is loaded, the state that it shares with older
> checkpoints is loaded as well. I suggested to load different
> checkpoints (i.e. chk-* folders) and compare the numbers of objects in
> their states. To prevent the job from discarding the state, it can
> either be stopped for some time and then restarted from the latest
> checkpoint; or the number of retained checkpoints can be increased
> [2]. Copying isn't necessary.
>
> Besides that, you can also check state sizes of operator in Flink Web
> UI (but not the sizes of individual states). If the operators are
> chained then their combined state size will be shown. To prevent this,
> you can disable chaining [3] (although this will have performance
> impact).
>
> Individual checkpoint folders should be eventually removed (when the
> checkpoint is subsumed). However, this is not guaranteed: if there is
> any problem during deletion, it will be logged, but the job will not
> fail.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#directory-structure
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-checkpoints-num-retained
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#disable-chaining
>
> Regards,
> Roman
>
> On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa
>  wrote:
> >
> > Hi Roman,
> >
> > Maybe I'm misunderstanding the structure of the data within the checkpoint. 
> > You suggest comparing counts of objects in different checkpoints, I assume 
> > you mean copying my "checkpoints" folder at different times and comparing, 
> > not comparing different "chk-*" folders in the same snapshot, right?
> >
> > I haven't executed the processor program with a newer checkpoint, but I did 
> > look at the folder in the running system, and I noticed that most of the 
> > chk-* folders have remained unchanged, there's only 1 or 2 new folders 
> > corresponding to newer checkpoints. I would think this makes sense since 
> > the configuration specifies that only 1 completed checkpoint should be 
> > retained, but then why are the older chk-* folders still there? I did 
> > trigger a manual restart of the Flink cluster in the past (before starting 
> > the long-running test), but if my policy is to CLAIM the checkpoint, 
> > Flink's documentation states that it would be cleaned eventually.
> >
> > Moreover, just by looking at folder sizes with "du", I can see that most of 
> > the state is held in the "shared" folder, and that has grown for sure; I'm 
> > not sure what "shared" usually holds, but if that's what's growing, maybe I 
> > can rule out expired state staying around?. My pipeline doesn't use timers, 
> > although I guess Flink itself may use them. Is there any way I could get 
> > some insight into which operator holds larger states?
> >
> > Regards,
> > Alexis.
> >
> > -Original Message-
> > From: Roman Khachatryan 
> > Sent: Dienstag, 12. April 2022 12:37
> > To: Alexis Sarda-Espinosa 
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> > processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for sharing this. I think the program is correct.
> > Although it doesn't take timers into account; and to estimate the state 
> > size more accurately, you could also use the same serializers used by the 
> > job.
> > But maybe it makes more sense to compare the counts of objects in different 
> > checkpoints and see which state is growing.
> >
> > If the number of keys is small, compaction should eventually clean up the 
> > old values, given that the windows eventually expire. I think it makes 
> > sense to check that watermarks in all windows are making progress.
> >
> > Setting 

Re: Flink OLAP 与 Trino TPC-DS 对比

2022-04-19 Thread LuNing Wang
https://www.yuque.com/docs/share/8625d14b-d465-48a3-8dc1-0be32b138f34?#lUX6
《tpcds-各引擎耗时》
链接有效期至 2022-04-22 10:31:05

LuNing Wong  于2022年4月18日周一 09:44写道:

> 补充,用的Hive 3.1.2 Hadoop 3.1.0做的数据源。
>
> LuNing Wong  于2022年4月18日周一 09:42写道:
>
> > Flink版本是1.14.4, Trino是359版本,tm.memory.process.size和CPU资源我都和Trino对齐了。都是32G
> > 16核 16线程,2台计算节点。
> >
> > Zhilong Hong  于2022年4月15日周五 18:21写道:
> >
> >> Hello, Luning!
> >>
> >>
> >>
> 我们目前也正在关注Flink在OLAP场景的性能表现,请问你测试的Flink和Trino版本分别是什么呢?另外我看到flink-sql-benchmark中所使用的集群配置和你的不太一样,可能需要根据集群资源对flink-conf.yaml中taskmanager.memory.process.size等资源配置进行调整。
> >>
> >> Best,
> >> Zhilong
> >>
> >> On Fri, Apr 15, 2022 at 2:38 PM LuNing Wang 
> >> wrote:
> >>
> >> > 跑了100个 TPC-DS SQL
> >> > 10 GB 数据、2个Worker(TM)、每个32G内存,16个核心。
> >> > Flink平均用时 18秒
> >> > Trino平均用时 7秒
> >> >
> >> > 我看字节跳动和阿里的老师测试,Flink和presto
> >> OLAP性能接近,但是我测的差距很大。想进一步和老师交流下,是不是我Flink设置的有问题。
> >> > 我基本上是按照下面这个项目里模板配置的Flink相关参数。
> >> > https://github.com/ververica/flink-sql-benchmark
> >> >
> >> >
> >> > LuNing Wang  于2022年4月15日周五 14:34写道:
> >> >
> >> > > 跑了100个SQL
> >> > >
> >> >
> >>
> >
>


Re: Vertica jdbc sink error

2022-04-19 Thread Martijn Visser
Hi Jasmin,

Vertica is not implemented as a JDBC dialect, which is a requirement for
Flink to support this. You can find an overview of the currently supported
JDBC dialects in the documentation [1]. It would be interesting if you
could contribute support for Vertica towards Flink.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82
https://github.com/MartijnVisser

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/


On Fri, 15 Apr 2022 at 14:27, Jasmin Redzepovic <
jasmin.redzepo...@superbet.com> wrote:

> Hello Flink community,
>
> I am getting this error when writing data to Vertica table:
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Unable to create a sink for writing table
> 'default_catalog.default_database.VerticaSink’.
> ...
> Caused by: java.lang.IllegalStateException: Cannot handle such jdbc url:
> jdbc:vertica://:5433/sbtverticadb01
>
>
> This is the code for creating sink table and inserting into it:
>
> tEnv.executeSql("""
> CREATE TABLE VerticaSink (
> TICKET_ID STRING,
> TICKET_CODE STRING,
> BUSINESS_MARKET_CODE STRING,
> BUSINESS_LINE_CODE STRING,
> PRIMARY KEY (TICKET_ID) NOT ENFORCED
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:vertica://:5433/sbtverticadb01',
> 'table-name' = 'SBX_DE.FLINK_SINK2',
> 'username' = ‘username',
> 'password' = ‘password'
> )
> "”")
>
> tEnv.executeSql("""
> INSERT INTO VerticaSink
> SELECT TICKET_ID, TICKET_CODE, BUSINESS_MARKET_CODE, BUSINESS_LINE_CODE
> from Transformation2
> "”")
>
> I downloaded jdbc driver for Vertica and added it into ./lib folder.
> Does anyone have idea what could it be? I googled the error, but didn’t
> find anything helpful.
>
> Thanks and best regards,
> Jasmin
>
> This email is confidential and intended solely for the use of the
> individual or entity to whom it is addressed. If you received this e-mail
> by mistake, please notify the sender immediately by e-mail and delete this
> e-mail from your system. Please be informed that if you are not the
> intended recipient, you should not disseminate, distribute, disclose, copy
> or use this e-mail in any way, the act of dissemination, distribution,
> disclosure, copying or taking any action in reliance on the contents of
> this information being strictly prohibited. This e-mail is sent by a
> Superbet Group company. Any views expressed by the sender of this email are
> not necessarily those of Superbet Group. Please note that computer viruses
> can be transmitted by email. You are advised to check this email and any
> attachments for the presence of viruses. Superbet Group cannot accept any
> responsibility for any viruses transmitted by this email and/or any
> attachments.
>


[State Processor API] unable to load the state of a trigger attached to a session window

2022-04-19 Thread Dongwon Kim
Hi,

I'm using Flink-1.14.4 and failed to load in WindowReaderFunction the state
of a stateful trigger attached to a session window.
I found that the following data become available in WindowReaderFunction:
- the state defined in the ProcessWindowFunction
- the registered timers of the stateful trigger attached to the session
window
- all the elements of the window
, but the state of the stateful trigger attached to the session window is
not available when using State Processor API.
In other words, the following code always returns null when used with
session windows:

> ReducingState state =
> context.triggerState(triggerCountDesc);
> Long val = state.get();
>
On the other hand, the above code snippet returns expected data when used
with sliding and tumbling windows.

To explain the problem, I made up an example in a similar spirit to
o.a.f.state.api.SavepointWindowReaderITCase.
Here you can find three test cases each with different types of event-time
windows: Session, Sliding, and Tumbling.
With sliding and tumbling windows, I can read the state of the trigger
attached to the windows in WindowReaderFunction.
However, with a session window, I cannot read the state of the trigger in
WindowReaderFunction.

Is it a bug, or did I miss something?

Best,

Dongwon


SavepointWindowReaderTestForDifferentWindows.java
Description: Binary data


Re: Re: Python callback server start failed

2022-04-19 Thread Dian Fu
这个文件:myReplace.py

On Tue, Apr 19, 2022 at 2:38 PM 799590...@qq.com <799590...@qq.com> wrote:

>
> 是下面这个类吗?  没有import
> org.apache.flink.table.functions.ScalarFunction
>
> 用flinksql创建Function的时候没有要求import  ScalarFunction
>
> 下面是上传py文件的逻辑代码
>
> String originalFilename = file.getOriginalFilename();
> destFile = new File(System.getProperty("user.dir") + uploadTempPath,
> originalFilename);
> FileUtils.copyInputStreamToFile(file.getInputStream(), destFile);
> String localPath = destFile.getPath();
> copyToHdfsFromLocalFile(localPath,dst);
> hadoopPath = dst+originalFilename;
> String serviceName = configuration.get("fs.defaultFS");
> StreamTableEnvironment tableEnv = getTableEnv();
> String pyStr =
> tableEnv.getConfig().getConfiguration().getString("python.files", "");
> log.info(pyStr);
> String[] split = pyStr.split(",");
> List list = new ArrayList<>(Arrays.asList(split));
> list.add(serviceName+hadoopPath);
>
> tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list));
> tableEnv.executeSql("DROP FUNCTION IF EXISTS
> "+catalogName+"."+defaultDatabase+"."+functionName).print();
> String createSql = "CREATE FUNCTION IF NOT EXISTS
> "+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "'
> LANGUAGE PYTHON";
> tableEnv.executeSql(createSql).print();
>
>
> --
> 799590...@qq.com
>
>
> *From:* Dian Fu 
> *Date:* 2022-04-19 14:20
> *To:* user-zh ; 799590989 <799590...@qq.com>
> *Subject:* Re: Re: Python callback server start failed
> NameError: name 'ScalarFunction' is not defined
>
> 你 import ScalarFunction了吗?
>
> On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID
> <799590...@qq.com.invalid> wrote:
>
>>
>> 以下是刚刚的报错日志,现在里面没有Python callback server start
>> failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了
>>
>> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257
>> |org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as
>> /home/tetris/conf
>> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219
>> |org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog
>> 'myhive'
>> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299
>> |org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive
>> metastore
>> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262
>> |org.apache.flink.table.catalog.CatalogManager |Set the current default
>> catalog as [myhive] and the current default database as [tetris].
>> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3
>> |SqlTaskRunService.java:599
>> |com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
>> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31
>> |com.chinaoly.tetris.flink.util.EnvUtil |registerFactory :
>> org.apache.hadoop.fs.FsUrlStreamHandlerFactory
>> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55
>> |com.chinaoly.tetris.flink.util.EnvUtil
>> |hdfs://chinaoly/tetris/file/function/flink/myReplace.py
>> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3
>> |SqlTaskRunService.java:621
>> |com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
>> 13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) -
>> Empty watch file list. Disabling
>> 2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284
>> |org.apache.flink.client.python.PythonEnvUtils |Starting Python process
>> with environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP,
>> HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console,
>> ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64,
>> PSModulePath=C:\Program
>> Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
>> SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2,
>> USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86),
>> FPS_BROWSER_USER_PROFILE_STRING=Default,
>> PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
>> PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC,
>> DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program
>> Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files,
>> HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model
>> 94 Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files,
>> PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1,
>> LOCALAPPDATA=C:\Users\Administrator\AppData\Local,
>> ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program
>> Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP,
>> FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer,
>> LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program
>> Files\Java\jdk1.8.0_261, OneDrive=C:\Users\Administrator\OneDrive,
>> 

Re: Re: Python callback server start failed

2022-04-19 Thread 799590...@qq.com.INVALID

是下面这个类吗?  没有import
org.apache.flink.table.functions.ScalarFunction

用flinksql创建Function的时候没有要求import  ScalarFunction

下面是上传py文件的逻辑代码

String originalFilename = file.getOriginalFilename();
destFile = new File(System.getProperty("user.dir") + uploadTempPath, 
originalFilename);
FileUtils.copyInputStreamToFile(file.getInputStream(), destFile);
String localPath = destFile.getPath();
copyToHdfsFromLocalFile(localPath,dst);
hadoopPath = dst+originalFilename;
String serviceName = configuration.get("fs.defaultFS");
StreamTableEnvironment tableEnv = getTableEnv();
String pyStr = 
tableEnv.getConfig().getConfiguration().getString("python.files", "");
log.info(pyStr);
String[] split = pyStr.split(",");
List list = new ArrayList<>(Arrays.asList(split));
list.add(serviceName+hadoopPath);
tableEnv.getConfig().getConfiguration().setString("python.files",StrUtil.join(",",list));
tableEnv.executeSql("DROP FUNCTION IF EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName).print();
String createSql = "CREATE FUNCTION IF NOT EXISTS 
"+catalogName+"."+defaultDatabase+"."+functionName+" AS '" +className+ "' 
LANGUAGE PYTHON";
tableEnv.executeSql(createSql).print();




799590...@qq.com
 
From: Dian Fu
Date: 2022-04-19 14:20
To: user-zh; 799590989
Subject: Re: Re: Python callback server start failed
NameError: name 'ScalarFunction' is not defined

你 import ScalarFunction了吗?

On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID 
<799590...@qq.com.invalid> wrote:

以下是刚刚的报错日志,现在里面没有Python callback server start 
failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了

2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257 
|org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as 
/home/tetris/conf
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219 
|org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog 'myhive'
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299 
|org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive metastore
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262 
|org.apache.flink.table.catalog.CatalogManager |Set the current default catalog 
as [myhive] and the current default database as [tetris].
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:599 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31 
|com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55 
|com.chinaoly.tetris.flink.util.EnvUtil 
|hdfs://chinaoly/tetris/file/function/flink/myReplace.py
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:621 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty 
watch file list. Disabling 
2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284 
|org.apache.flink.client.python.PythonEnvUtils |Starting Python process with 
environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, 
HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, 
ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, 
PSModulePath=C:\Program 
Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
 SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, 
USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), 
FPS_BROWSER_USER_PROFILE_STRING=Default, 
PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
 PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, 
DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program 
Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, 
HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model 94 
Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, 
PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, 
LOCALAPPDATA=C:\Users\Administrator\AppData\Local, 
ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program 
Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, 
FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, 
LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program Files\Java\jdk1.8.0_261, 
OneDrive=C:\Users\Administrator\OneDrive, 
APPDATA=C:\Users\Administrator\AppData\Roaming, 
ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files 
(x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, Path=C:\Program 
Files (x86)\Common 

Re: Re: Python callback server start failed

2022-04-19 Thread Dian Fu
NameError: name 'ScalarFunction' is not defined

你 import ScalarFunction了吗?

On Tue, Apr 19, 2022 at 2:08 PM 799590...@qq.com.INVALID
<799590...@qq.com.invalid> wrote:

>
> 以下是刚刚的报错日志,现在里面没有Python callback server start
> failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了
>
> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257
> |org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as
> /home/tetris/conf
> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219
> |org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog
> 'myhive'
> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299
> |org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive
> metastore
> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262
> |org.apache.flink.table.catalog.CatalogManager |Set the current default
> catalog as [myhive] and the current default database as [tetris].
> 2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3
> |SqlTaskRunService.java:599
> |com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31
> |com.chinaoly.tetris.flink.util.EnvUtil |registerFactory :
> org.apache.hadoop.fs.FsUrlStreamHandlerFactory
> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55
> |com.chinaoly.tetris.flink.util.EnvUtil
> |hdfs://chinaoly/tetris/file/function/flink/myReplace.py
> 2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3
> |SqlTaskRunService.java:621
> |com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
> 13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty
> watch file list. Disabling
> 2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284
> |org.apache.flink.client.python.PythonEnvUtils |Starting Python process
> with environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP,
> HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console,
> ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64,
> PSModulePath=C:\Program
> Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
> SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2,
> USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86),
> FPS_BROWSER_USER_PROFILE_STRING=Default,
> PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
> PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC,
> DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program
> Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files,
> HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model
> 94 Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files,
> PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1,
> LOCALAPPDATA=C:\Users\Administrator\AppData\Local,
> ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program
> Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP,
> FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer,
> LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program
> Files\Java\jdk1.8.0_261, OneDrive=C:\Users\Administrator\OneDrive,
> APPDATA=C:\Users\Administrator\AppData\Roaming,
> ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files
> (x86)\scala, CommonProgramFiles=C:\Program Files\Common Files,
> Path=C:\Program Files (x86)\Common
> Files\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;C:\Program
> Files\Git\cmd;C:\Program Files\Java\jdk1.8.0_261\bin;E:\Program
> Files\apache-maven-3.8.2\bin;E:\Program
> Files\TortoiseSVN\bin;%HADOOP_HOME%\bin;E:\Program
> Files\nodejs\;C:\ProgramData\chocolatey\bin;C:\Program Files
> (x86)\scala\bin;C:\Program Files (x86)\scala\bin;E:\Program
> Files\python36;E:\Program Files\python36\Scripts\;E:\Program
> Files\python36\;C:\Users\Administrator\AppData\Local\Microsoft\WindowsApps;;E:\Program
> Files\JetBrains\IntelliJ IDEA
> 2019.3.3\bin;;C:\Users\Administrator\AppData\Roaming\npm,
> JETBRAINS_LICENSE_SERVER=http://fls.jetbrains-agent.com, OS=Windows_NT,
> COMPUTERNAME=DESKTOP-LBP3EGP, PROCESSOR_REVISION=5e03,
> CommonProgramW6432=C:\Program Files\Common Files,
> PYFLINK_GATEWAY_PORT=53944, ComSpec=C:\WINDOWS\system32\cmd.exe,
> SystemRoot=C:\WINDOWS, TEMP=C:\Users\ADMINI~1\AppData\Local\Temp,
> HOMEDRIVE=C:, USERPROFILE=C:\Users\Administrator,
> TMP=C:\Users\ADMINI~1\AppData\Local\Temp,
> CommonProgramFiles(x86)=C:\Program Files (x86)\Common Files,
> NUMBER_OF_PROCESSORS=4,
> IDEA_INITIAL_DIRECTORY=C:\Users\Administrator\Desktop}, command: python.exe
> -m pyflink.pyflink_callback_server
> 2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |ExceptionHandle.java:31
> 

Re: Re: Python callback server start failed

2022-04-19 Thread 799590...@qq.com.INVALID

以下是刚刚的报错日志,现在里面没有Python callback server start 
failed了,是因为原来黄底部分获取的Python环境变量的路径为C:\Python310(早先安装的python,后来安装python36,设置环境PATH变量为python36),重启机器后好了

2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:257 
|org.apache.flink.table.catalog.hive.HiveCatalog |Setting hive conf dir as 
/home/tetris/conf
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:219 
|org.apache.flink.table.catalog.hive.HiveCatalog |Created HiveCatalog 'myhive'
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |HiveCatalog.java:299 
|org.apache.flink.table.catalog.hive.HiveCatalog |Connected to Hive metastore
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |CatalogManager.java:262 
|org.apache.flink.table.catalog.CatalogManager |Set the current default catalog 
as [myhive] and the current default database as [tetris].
2022-04-19 13:58:11 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:599 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |开始加载hdfs上的udf
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:31 
|com.chinaoly.tetris.flink.util.EnvUtil |registerFactory : 
org.apache.hadoop.fs.FsUrlStreamHandlerFactory
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |EnvUtil.java:55 
|com.chinaoly.tetris.flink.util.EnvUtil 
|hdfs://chinaoly/tetris/file/function/flink/myReplace.py
2022-04-19 13:58:12 |INFO  |http-nio-9532-exec-3 |SqlTaskRunService.java:621 
|com.chinaoly.tetris.flink.service.SqlTaskRunService |完成加载hdfs上的udf
13:58:20,956 |-INFO in ReconfigureOnChangeTask(born:1650344954461) - Empty 
watch file list. Disabling 
2022-04-19 13:58:21 |INFO  |http-nio-9532-exec-3 |PythonEnvUtils.java:284 
|org.apache.flink.client.python.PythonEnvUtils |Starting Python process with 
environment variables: {USERDOMAIN_ROAMINGPROFILE=DESKTOP-LBP3EGP, 
HADOOP_CONF_DIR=/home/tetris/conf, PROCESSOR_LEVEL=6, SESSIONNAME=Console, 
ALLUSERSPROFILE=C:\ProgramData, PROCESSOR_ARCHITECTURE=AMD64, 
PSModulePath=C:\Program 
Files\WindowsPowerShell\Modules;C:\WINDOWS\system32\WindowsPowerShell\v1.0\Modules,
 SystemDrive=C:, MAVEN_HOME=E:\Program Files\apache-maven-3.8.2, 
USERNAME=Administrator, ProgramFiles(x86)=C:\Program Files (x86), 
FPS_BROWSER_USER_PROFILE_STRING=Default, 
PYTHONPATH=C:/Users/ADMINI~1/AppData/Local/Temp/pyflink/df5887da-8c4d-4807-b891-c313338f1c14/63150f6e-5ea0-4186-be56-ce25e69c4265,
 PATHEXT=.COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC, 
DriverData=C:\Windows\System32\Drivers\DriverData, PYTHON_HOME=E:\Program 
Files\python36, ProgramData=C:\ProgramData, ProgramW6432=C:\Program Files, 
HOMEPATH=\Users\Administrator, PROCESSOR_IDENTIFIER=Intel64 Family 6 Model 94 
Stepping 3, GenuineIntel, ProgramFiles=C:\Program Files, 
PUBLIC=C:\Users\Public, windir=C:\WINDOWS, =::=::\, ZES_ENABLE_SYSMAN=1, 
LOCALAPPDATA=C:\Users\Administrator\AppData\Local, 
ChocolateyLastPathUpdate=132792636909481380, IntelliJ IDEA=E:\Program 
Files\JetBrains\IntelliJ IDEA 2019.3.3\bin;, USERDOMAIN=DESKTOP-LBP3EGP, 
FPS_BROWSER_APP_PROFILE_STRING=Internet Explorer, 
LOGONSERVER=\\DESKTOP-LBP3EGP, JAVA_HOME=C:\Program Files\Java\jdk1.8.0_261, 
OneDrive=C:\Users\Administrator\OneDrive, 
APPDATA=C:\Users\Administrator\AppData\Roaming, 
ChocolateyInstall=C:\ProgramData\chocolatey, SCALA_HOME=C:\Program Files 
(x86)\scala, CommonProgramFiles=C:\Program Files\Common Files, Path=C:\Program 
Files (x86)\Common 
Files\Oracle\Java\javapath;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\WINDOWS\System32\OpenSSH\;C:\Program
 Files\Git\cmd;C:\Program Files\Java\jdk1.8.0_261\bin;E:\Program 
Files\apache-maven-3.8.2\bin;E:\Program 
Files\TortoiseSVN\bin;%HADOOP_HOME%\bin;E:\Program 
Files\nodejs\;C:\ProgramData\chocolatey\bin;C:\Program Files 
(x86)\scala\bin;C:\Program Files (x86)\scala\bin;E:\Program 
Files\python36;E:\Program Files\python36\Scripts\;E:\Program 
Files\python36\;C:\Users\Administrator\AppData\Local\Microsoft\WindowsApps;;E:\Program
 Files\JetBrains\IntelliJ IDEA 
2019.3.3\bin;;C:\Users\Administrator\AppData\Roaming\npm, 
JETBRAINS_LICENSE_SERVER=http://fls.jetbrains-agent.com, OS=Windows_NT, 
COMPUTERNAME=DESKTOP-LBP3EGP, PROCESSOR_REVISION=5e03, 
CommonProgramW6432=C:\Program Files\Common Files, PYFLINK_GATEWAY_PORT=53944, 
ComSpec=C:\WINDOWS\system32\cmd.exe, SystemRoot=C:\WINDOWS, 
TEMP=C:\Users\ADMINI~1\AppData\Local\Temp, HOMEDRIVE=C:, 
USERPROFILE=C:\Users\Administrator, TMP=C:\Users\ADMINI~1\AppData\Local\Temp, 
CommonProgramFiles(x86)=C:\Program Files (x86)\Common Files, 
NUMBER_OF_PROCESSORS=4, IDEA_INITIAL_DIRECTORY=C:\Users\Administrator\Desktop}, 
command: python.exe -m pyflink.pyflink_callback_server
2022-04-19 13:58:22 |ERROR |http-nio-9532-exec-3 |ExceptionHandle.java:31 
|com.chinaoly.frm.core.aop.ExceptionHandle 
|org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot 
instantiate user-defined function 'myhive.tetris.myReplace'.
at