Insufficient number of network buffers for simple last_value aggregate

2020-10-29 Thread Schneider, Thilo
Dear list, when trying to compute a simple last_value aggregate, flink fails with an IOException. The query is defined as follows: from pyflink.table import EnvironmentSettings, StreamTableEnvironment env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().buil

RE: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Partha Mishra
Hi Cong, No never set the uid for the operators. Regards, Partha Mishra From: Congxian Qiu Sent: Friday, October 30, 2020 10:51 AM To: Partha Mishra Cc: Sivaprasanna ; user@flink.apache.org Subject: Re: Resuming Savepoint issue with upgraded Flink version 1.11.2 Hi Partha The exception h

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
- I will increase the jvm-overhead - I don't have any failovers or restarts until it starts happening - If it happens again even with the changes, I'll post the NMT output On Fri, Oct 30, 2020 at 3:54 AM Xintong Song wrote: > Hi Ori, > > I'm not sure about where the problem comes from. There are

Re: Resuming Savepoint issue with upgraded Flink version 1.11.2

2020-10-29 Thread Congxian Qiu
Hi Partha The exception here said that there is some operator in the checkpoint/savepoint, but not in the new program. As you said that, both times use the same user program binary, this seems a little strange to me. did you ever set the uid for all the operators? Best, Congxian Parth

Re: Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Yun Gao
Hi Alexander, Sorry I might not fully understand the issue, do you means the "flink" jar is the same jar with the spring app fat jar, or they are not the same jar? As a whole, I think the parameter value we need for jarFiles is the absolute path of the jar file. We might need some logic

Re: Native kubernetes setup failed to start job

2020-10-29 Thread Yun Gao
Hi Liangde, I pull in Yang Wang who is the expert for Flink on K8s. Best, Yun --Original Mail -- Sender:Chen Liangde Send Date:Fri Oct 30 05:30:40 2020 Recipients:Flink ML Subject:Native kubernetes setup failed to start job I created a flink cluster in k

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori, I'm not sure about where the problem comes from. There are several things that might worse a try. - Further increasing the `jvm-overhead`. Your `ps` result suggests that the Flink process uses 120+GB, while `process.size` is configured 112GB. So I think 2GB `jvm-overhead` might not be enou

Re: how to enable metrics in Flink 1.11

2020-10-29 Thread Diwakar Jha
Hello Everyone, I'm able to get my Flink UI up and running (it was related to the session manager plugin on my local laptop) but I'm not seeing any taskmanager/jobmanager logs in my Flink application. I have attached some yarn application logs while it's running but am not able to figure out how t

Native kubernetes setup failed to start job

2020-10-29 Thread Chen Liangde
I created a flink cluster in kubernetes following this guide: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/native_kubernetes.html The job manager was running. When a job was submitted to the job manager, it spawned a task manager pod, but the task manager failed to c

Re: Re: How to deploy dynamically generated flink jobs?

2020-10-29 Thread Alexander Bagerman
Thanks, Yun. Makes sense. How would you reference a jar file from inside of another jar for such invocation? In my case I would have an interactive application - spring boot web app - where the job would be configured and StreamExecutionEnvironment.execute(jobName) is called. Spring app is a runnab

Re: ValidationException using DataTypeHint in Scalar Function

2020-10-29 Thread Steve Whelan
For some background, I am upgrading from Flink v1.9 to v1.11. So what I am about to describe is our implementation on v1.9, which worked. I am trying to achieve the same functionality on v1.11. I have a DataStream whose type is an avro generated POJO, which contains a field *UrlParameters* that is

Re: Table Print SQL Connector

2020-10-29 Thread Ruben Laguna
Hi, Using `mytable.execute().print()` is exactly what I wanted, thanks. But I'm still curious. I'm just running this locally, in a junit test case (not using a flink cluster) just like in [flink-playgrounds SpendReportTest][1] so in this scenario where does the task manager (if there is taskmanag

Re: Table Print SQL Connector

2020-10-29 Thread Dawid Wysakowicz
You should be able to use the "print" sink. Remember though that the "print" sink prints into the stdout/stderr of TaskManagers, not the Client, where you submit the query. This is different from the TableResult, which collects results in the client. BTW, for printing you can use TableResult#print,

Table Print SQL Connector

2020-10-29 Thread Ruben Laguna
How can I use the Table [Print SQL connector][1]? I tried the following (batch mode) but it does not give any output: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); final LocalD

Re: Feature request: Removing state from operators

2020-10-29 Thread Peter Westermann
Does that actually allow removing a state completely (vs. just modifying the values stored in state)? Ideally, we would want to just interact with state via KeyedStateStore. Maybe it would be possible to add a couple methods there, e.g. like this: // List all pre-existing states List> listState

Re: Feature request: Removing state from operators

2020-10-29 Thread Congxian Qiu
Hi Peter Can applyToAllKeys[1] in KeyedStateBackend help you here? but currently, this is not exposed to users now. [1] https://github.com/apache/flink/blob/fada6fb6ac9fd7f6510f1f2d77b6baa06563e222/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L65 Best, Co

Re: What does Kafka Error sending fetch request mean for the Kafka source?

2020-10-29 Thread Becket Qin
Hi John, The log message you saw from Kafka consumer simply means the consumer was disconnected from the broker that FetchRequest was supposed to be sent to. The disconnection can happen in many cases, such as broker down, network glitches, etc. The KafkaConsumer will just reconnect and retry send

Flink kafka - Message Prioritization

2020-10-29 Thread Vignesh Ramesh
Hi, I have a flink pipeline which reads from a kafka topic does a map operation(builds an ElasticSearch model) and sinks it to Elasticsearch *Pipeline-1:* Flink-Kafka-Connector-Consumer(topic1) (parallelism 8) -> Map (parallelism 8) -> Flink-Es-connector-Sink(es1) (parallelism 8) Now i want som

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi Xintong, Unfortunately I cannot upgrade to 1.10.2, because EMR has either 1.10.0 or 1.11.0. About the overhead - turns out I already configured taskmanager.memory.jvm-overhead.max to 2 gb instead of the default 1 gb. Should I increase it further? state.backend.rocksdb.memory.managed is alread

Re: quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Danny Chan
Yes, Flink SQL use the back quote ` as the quote character, for your SQL, it should be: CREATE TABLE table1(`ts` TIMESTAMP) WITH(...) Ruben Laguna 于2020年10月29日周四 下午6:32写道: > I made this question on [Stackoverflow][1] but I'm cross posting here. > > > Are double quoted identifiers allowed in Fl

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori, RocksDB also uses managed memory. If the memory overuse indeed comes from RocksDB, then increasing managed memory fraction will not help. RocksDB will try to use as many memory as the configured managed memory size. Therefore increasing managed memory fraction also makes RocksDB try to use

Re: ValidationException using DataTypeHint in Scalar Function

2020-10-29 Thread Dawid Wysakowicz
Sorry for a late reply. Could you share a complete, reproducible example? I am mostly interested in where do you get the input RAW('java.util.Map', '...') type that you are passing into your UDF. Raw types are equal/equivalent only if both the class and the serializer are equal. A side note: Hav

quoted identifiers in Table SQL give SQL parse failed.

2020-10-29 Thread Ruben Laguna
I made this question on [Stackoverflow][1] but I'm cross posting here. Are double quoted identifiers allowed in Flink SQL? [Calcite documentation says to use double quoted identifiers](https://calcite.apache.org/docs/reference.html#identifiers) but they don't seem to work (see below). On the othe

Re: Running flink in a Local Execution Environment for Production Workloads

2020-10-29 Thread Matthias Pohl
Hi Joseph, thanks for reaching out to us. There shouldn't be any downsides other than the one you already mentioned as far as I know. Best, Matthias On Fri, Oct 23, 2020 at 1:27 PM Joseph Lorenzini wrote: > Hi all, > > > > I plan to run flink jobs as docker containers in a AWS Elastic Container

Re: Flink checkpointing state

2020-10-29 Thread Yun Tang
Hi Added Yang Wang who mainly develops this feature, I think he could provide more information. Best Yun Tang From: Boris Lublinsky Sent: Tuesday, October 27, 2020 22:57 To: Yun Tang Cc: user Subject: Re: Flink checkpointing state Thanks Yun, This refers to F

Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yi Tang
Hi Yun Thanks for your quick reply. To be clear, It's not essential to implement these features into the SQL statement. And precisely because of the limitations of SQL, we want these features happen. 1. Yeah, I think the stream api also has not similar api. We want it because sometimes we want to

Re: Rich Function Thread Safety

2020-10-29 Thread Igal Shilman
Hi Lian, Good to hear that you are learning about StateFun, and I'd be happy to answer any of your questions while doing so :-) Perhaps in the future it would be best if you start a new email thread, so that it would be easier to spot your question. The following is completely thread safe: final

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Ori Popowski
Hi, PID 20331 is indeed the Flink process, specifically the TaskManager process. - Workload is a streaming workload reading from Kafka and writing to S3 using a custom Sink - RockDB state backend is used with default settings - My external dependencies are: -- logback -- jackson -- flatbuffers --

Re: Unify error handler and late window record output for SQL api

2020-10-29 Thread Yun Gao
Hi Yi, Sorry I'm might not be experts for SQL, as a whole, since SQL should be a high-level API, the users might have less control for the jobs: 1. Unfortunately we do not have the API to catch all the errors. I think even with DataStream, we also do not provide API to catch the run

Re: Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-29 Thread Xintong Song
Hi Ori, It looks like Flink indeed uses more memory than expected. I assume the first item with PID 20331 is the flink process, right? It would be helpful if you can briefly introduce your workload. - What kind of workload are you running? Streaming or batch? - Do you use RocksDB state backend? -

Unify error handler and late window record output for SQL api

2020-10-29 Thread Yi Tang
Hi, I'm looking for a way to handle potential errors in job submitted with SQL API, but unfortunately nothing found. Handle errors manually in SQL API is hard, I think. Is there a way to handle all errors and send them to a SideOutput to avoid task failure. Also one can put late records into a Sid