Fwd: flink1.12.0中使用LISTAGG报错

2020-12-16 Thread huang huang
-- Forwarded message - 发件人: huang huang Date: 2020年12月17日周四 下午12:47 Subject: flink1.12.0中使用LISTAGG报错 To: 各位好: 使用pyflink执行sql时,LISTAGG出现了错误 请问目前版本的LISTAGG不支持排序么,有人遇到过这种情况么? *FLINK版本:*1.12.0 *SQL代码:* SELECT session_id, LISTAGG(page_id, ',') WITHIN GROUP(ORDER BY action_time

Re: Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-16 Thread Xintong Song
Hi Lu, I assume you are using ZooKeeper as the HA service? A common cause of unexpected leadership lost is the instability of HA service. E.g., if ZK does not receive heartbeat from Flink RM for a certain period of time, it will revoke the leadership and notify other components. You can look into

Re: Challenges Deploying Flink With Savepoints On Kubernetes

2020-12-16 Thread vishalovercome
I'm not sure if this addresses the original concern. For instance consider this sequence: 1. Job starts from savepoint 2. Job creates a few checkpoints 3. Job manager (just one in kubernetes) crashes and restarts with the commands specified in the kubernetes manifest which has the savepoint path

Re: state inside functions

2020-12-16 Thread vishalovercome
When running in HA mode or taking savepoints, if we pass configuration as constructor arguments, then it seems as though changing configuration at a later time doesn't work as it uses state to restore older configuration. How can we pass configuration while having the flexibility to change the valu

Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
My flink job loads several configuration files that contain job, operator and business configuration. One of the operators is an AsyncOperator with function like so: class AsyncFun(config: T) extends RichAsyncFunction[X, Y] { @transient private lazy val client = f(config, metricGroup, etc.)

Re: Changing application configuration when restoring from checkpoint/savepoint

2020-12-16 Thread vishalovercome
Will this work - In main method, serialize config into a string and store it using ParameterTool with key as taskName and value as config (serialized as string). Then in the open method, lookup the relevant configuration using getTaskName(). A follow up to this would be configuring custom windowin

Will job manager restarts lead to repeated savepoint restoration?

2020-12-16 Thread vishalovercome
My flink job runs in kubernetes. This is the setup: 1. One job running as a job cluster with one job manager 2. HA powered by zookeeper (works fine) 3. Job/Deployment manifests stored in Github and deployed to kubernetes by Argo 4. State persisted to S3 If I were to stop (drain and take a savepoi

Flink 1.11 job hit error "Job leader lost leadership" or "ResourceManager leader changed to new address null"

2020-12-16 Thread Lu Niu
Hi, Flink users Recently we migrated to flink 1.11 and see exceptions like: ``` 2020-12-15 12:41:01,199 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: USER_MATERIALIZED_EVENT_SIGNAL-user_context-event -> USER_MATERIALIZED_EVENT_SIGNAL-user_context-event-as_nrtgtupl

Re: Direct Memory full

2020-12-16 Thread Rex Fenley
Ok, thanks. I've said this in another thread but everything seems to go completely idle during checkpoints while waiting on 1 operator, there's no CPU usage, hardly any disk usage. I'll assume it's something else then. On Wed, Dec 16, 2020 at 10:42 AM Robert Metzger wrote: > I don't think the di

Re: Direct Memory full

2020-12-16 Thread Robert Metzger
I don't think the direct memory is causing any performance bottlenecks. The backpressure is probably caused by something else (high CPU load, slow external system, data skew) On Wed, Dec 16, 2020 at 7:23 PM Steven Wu wrote: > if you are running out of direct buffer, you will see > "java.lang.Ou

Re: Direct Memory full

2020-12-16 Thread Steven Wu
if you are running out of direct buffer, you will see "java.lang.OutOfMemoryError: Direct buffer memory" On Wed, Dec 16, 2020 at 9:47 AM Rex Fenley wrote: > Thanks for the reply. If what I'm understanding is correct there's no > chance of an OOM, but since direct memory is for I/O, it being comp

Re: Direct Memory full

2020-12-16 Thread Rex Fenley
Thanks for the reply. If what I'm understanding is correct there's no chance of an OOM, but since direct memory is for I/O, it being completely filled may be a sign of backpressure? Currently one of our operators takes a tremendous amount of time to align during a checkpoint. Could increasing direc

Set TimeZone of Flink Streaming job

2020-12-16 Thread narasimha
Hi, How to configure flink job to follow a certain TimeZone, instead of default/UTC. Is it possible in the first place? Solutions present are for Table/SQL API. -- A.Narasimha Swamy

Re: Flink 1.12

2020-12-16 Thread Boris Lublinsky
Thanks guys, The reason I am interested in rolling update is to avoid complete restarts in the case of parameter (for example parallelism) changes. > On Dec 15, 2020, at 8:40 PM, Yang Wang wrote: > > Hi Boris, > > What is -p 10? > It is same to --parallelism 10. Set the default parallelism to

Re: Flink 1.12 and Stateful Functions

2020-12-16 Thread Igal Shilman
Hello Jan, The next release candidate for stateful functions is expected in mid February, and the release itself would follow shortly thereafter. This release will be based on Flink 1.12. Cheers, Igal. On Tue, Dec 15, 2020 at 4:19 PM Jan Brusch wrote: > Hi, > > just a quick question: Is there

Re: upsert-kafka to do temporal table joins

2020-12-16 Thread Leonard Xu
Hi, guoliubin Please ignore my previous answer, I mixed your question with an another one, I post the right temporal join sql syntax here. SELECT [column_list] FROM table1 [AS ] [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS ] ON table1.column-name1 = table2.column-na

Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-16 Thread Wei Zhong
Hi Deep, You can try to change the `FileProcessingMode.PROCESS_ONCE` to `FileProcessingMode.PROCESS_CONTINUOUSLY`. Best, Wei > 在 2020年12月15日,20:18,DEEP NARAYAN Singh 写道: > > Hi Wei, > Could you please suggest , how to fix this below issues. > > Thanks & Regards, > Deep > > On Mon, 14 Dec, 2

Re: Connecting to kinesis with mfa

2020-12-16 Thread Avi Levi
Awesome, thanks! looks good On Wed, Dec 16, 2020 at 12:55 PM Cranmer, Danny wrote: > Hey Avi, > > > > I have reproduced and found a solution. The issue is not MFA, it is the > BASIC credential provider is not using the token: > > > https://github.com/apache/flink/blob/master/flink-connectors/fl

Re: Getting an exception while stopping Flink with savepoints on Kubernetes+Minio

2020-12-16 Thread Robert Metzger
I guess you are seeing a different error now, because you are submitting the job, and stopping it right away. Can you produce new logs, where you wait until at least one Checkpoint successfully completed before you stop? >From the exception it seems that the job has not successfully been initializ

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Yang Wang
I am not sure about the root cause, but it seems that you could force the default NIO-based transport to work around[1]. Add -Denv.java.opts="-Dcom.datastax.driver.FORCE_NIO=true" to your submission commands. [1]. https://stackoverflow.com/questions/48762857/java-lang-classcastexception-netty-fail

Re: Connecting to kinesis with mfa

2020-12-16 Thread Cranmer, Danny
Hey Avi, I have reproduced and found a solution. The issue is not MFA, it is the BASIC credential provider is not using the token: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/AWSUtil.java#L18

Re: Getting an exception while stopping Flink with savepoints on Kubernetes+Minio

2020-12-16 Thread Folani
Hi, I attached the log files for Jobmanager and Taskmanager: jobmanager_log.asc 6122-f8b99d_log.6122-f8b99d_log

Re: upsert-kafka to do temporal table joins

2020-12-16 Thread Leonard Xu
Hi, guoliubin Sorry for the late reply, I think the example in release note has a minor typo error which missed the ‘AS’ keyword. SELECT o.order_id, o.order_time, o.amount * r.currency_rate AS amount, r.currency FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
Hi Yang, Thanks for the detailed explanation! Could you add "-Dyarn.per-job-cluster.include-user-jar=DISABLED" to your > command and have a try? After that, we > will disable the user jars including in the system classpath. I tried the following as you suggested: #!/bin/env bash export FLINK

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Yang Wang
Hi Dongwon, For application mode, the job submission happens in the JobManager side. We are using an embedded client to submit the job. So the user jar will be added to distributed cache. When deploying a task to TaskManager, it will be downloaded again and run in user classloader even though we a

Re: Is working with states supported in pyflink1.12?

2020-12-16 Thread Nadia Mostafa
Thanks for your quick response On Wed, Dec 16, 2020, 4:01 AM Xingbo Huang wrote: > Hi, > > As Chesnay said, PyFlink has already supported Python DataStream stateless > APIs so that users are able to perform some basic data transformations, but > doesn't provide state access support yet in releas

Re: Strange time format output by flink

2020-12-16 Thread Robert Metzger
Hey, maybe your event time time-stamps are wrong, leading to an obscure year (1705471 instead of 2020). Flink send's Long.MAX_Value as the final watermark. On Sat, Dec 12, 2020 at 2:29 PM Appleyuchi wrote: > > > I'm trying flatAggregate, the whole code is bug free and as follows: > > https://pa

Re: Disk usage during savepoints

2020-12-16 Thread Robert Metzger
Hey Rex, If I'm reading the Flink code correctly, then RocksDB will allocate it's storage across all configured tmp directories. Flink is respecting the io.tmp.dirs configuration property for that. it seems that you are using Flink on YARN, where Flink is respecting the tmp directory configs from

Re: failed w/ Application Mode but succeeded w/ Per-Job Cluster Mode

2020-12-16 Thread Dongwon Kim
Robert, But if Kafka is really only available in the user jar, then this error > still should not occur. I think so too; it should not occur. I scan through all the jar files in the classpath using `jar tf` but no jar contains org.apache.kafka.common.serialization.Deserializer with a different ve

Re: Getting an exception while stopping Flink with savepoints on Kubernetes+Minio

2020-12-16 Thread Robert Metzger
Hi, the logs from the client are not helpful for debugging this particular issue. With kubectl get pods, you can get the TaskManger pod names, with kubectl logs you can get the logs. The JobManager log would also be nice to have. On Mon, Dec 14, 2020 at 3:29 PM Folani wrote: > Hi Piotrek, > >

Re: Fine-grained task recovery

2020-12-16 Thread Robert Metzger
If a TaskManager fails, the data stored on it will be lost and needs to be recomputed. So even with the batch mode configured, more tasks might need a restart. To mitigate that, the Flink developers need to implement support for external shuffle services. On Wed, Dec 16, 2020 at 9:10 AM Robert Met

Re: Fine-grained task recovery

2020-12-16 Thread Robert Metzger
With region failover strategy, all connected subtasks will fail. If you are using the DataSet API with env.getConfig().setExecutionMode( ExecutionMode.BATCH);, you should get the desired behavior. On Mon, Dec 14, 2020 at 5:24 PM Stanislav Borissov wrote: > Hi, > > I'm running a simple, "embaras

Re: Connecting to kinesis with mfa

2020-12-16 Thread Avi Levi
Thanks Robert, I actually tried all of the above but got to the same unfortunate result On Wed, Dec 16, 2020 at 8:24 AM Robert Metzger wrote: > Hey Avi, > > Maybe providing secret/access key + session token doesn't work, and you > need to provide either one of them? > > https://docs.aws.amazon.c