[jira] [Created] (FLINK-33765) Flink SQL to support COLLECTLIST

2023-12-06 Thread Zhenzhong Xu (Jira)
Zhenzhong Xu created FLINK-33765:


 Summary: Flink SQL to support COLLECTLIST
 Key: FLINK-33765
 URL: https://issues.apache.org/jira/browse/FLINK-33765
 Project: Flink
  Issue Type: Improvement
  Components: API / DataSet
Reporter: Zhenzhong Xu


Flink SQL currently supports COLLECT, which returns a multiset, however, given 
support for casting from multiset to other types (especially array/list) is 
*very* limited, see 
[here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting]
 this is creating lots of headaches for ease of use.

Can we support COLLECT_LIST as a built-in system function?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-7894) Improve metrics around fine-grained recovery and associated checkpointing behaviors

2017-10-20 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-7894:
---

 Summary: Improve metrics around fine-grained recovery and 
associated checkpointing behaviors
 Key: FLINK-7894
 URL: https://issues.apache.org/jira/browse/FLINK-7894
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2, 1.4.0
Reporter: Zhenzhong Xu


Currently, the only metric around fine-grained recovery is "task_failures". 
It's a very high level metric, it would be nice to have the following 
improvements:

* Allows slice and dice into which tasks were restarted. 
* Recovery duration.
* Recovery associated checkpoint behaviors: cancels, failures, etc



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure

2017-10-15 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-7844:
---

 Summary: Fine Grained Recovery triggers checkpoint timeout failure
 Key: FLINK-7844
 URL: https://issues.apache.org/jira/browse/FLINK-7844
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.3.2
Reporter: Zhenzhong Xu


Context: 
We are using "individual" failover (fine-grained) recovery strategy for our 
embarrassingly parallel router use case. The topic has over 2000 partitions, 
and parallelism is set to ~180 that dispatched to over 20 task managers with 
around 180 slots.

We've noticed after one task manager termination, even though the individual 
recovery happens correctly, that the workload was re-dispatched to a new 
available task manager instance. However, the checkpoint would take 10 mins to 
eventually timeout, causing all other task managers not able to commit 
checkpoints. In a worst-case scenario, if job got restarted for other reasons 
(i.e. job manager termination), that would cause more messages to be 
re-processed/duplicates compared to the job without fine-grained recovery 
enabled.

I am suspecting that uber checkpoint was waiting for a previous checkpoint that 
initiated by the old task manager and thus taking a long time to time out.
Two questions:
1. Is there a configuration that controls this checkpoint timeout?
2. Is there any reason that when Job Manager realizes that Task Manager is gone 
and workload is redispatched, it still need to wait for the checkpoint 
initiated by the old task manager?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7278) Flink job can stuck while ZK leader reelected during ZK cluster migration

2017-07-26 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-7278:
---

 Summary: Flink job can stuck while ZK leader reelected during ZK 
cluster migration 
 Key: FLINK-7278
 URL: https://issues.apache.org/jira/browse/FLINK-7278
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Reporter: Zhenzhong Xu
Priority: Minor


We have observed an potential failure case while Flink job was running during 
ZK migration. Below describes the scenario.

1. Flink cluster running with standalone mode on Netfilx Titus container 
runtime 
2. We performed a ZK migration by updating new OS image one node at a time.
3. During ZK leader reelection, Flink cluster starts to exhibit failures and 
eventually end in a non-recoverable failure mode.
4. This behavior does not repro every time, may be caused by an edge race 
condition.

Below is a list of error messages ordered by event time:
017-07-22 02:47:44,535 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Source -> 
Sink: Sink (67/176) (0442d63c89809ad86f38874c845ba83f) switched from RUNNING to 
FAILED.
java.lang.Exception: TaskManager was lost/killed: ResourceID
{resourceId='f519795dfabcecfd7863ed587efdb398'}
@ titus-123072-worker-3-39 (dataPort=46879)
at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at 
org.apache.flink.runtime.instance.InstanceManager.unregisterAllTaskManagers(InstanceManager.java:234)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:330)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


2017-07-22 02:47:44,621 WARN com.netflix.spaas.runtime.FlinkJobManager - 
Discard message 
LeaderSessionMessage(7a247ad9-531b-4f27-877b-df41f9019431,Disconnect(0b300c04592b19750678259cd09fea95,java.lang.Exception:
 TaskManager akka://flink/user/taskmanager is disassociating)) because the 
expected leader session ID None did not equal the received leader session ID 
Some(7a247ad9-531b-4f27-877b-df41f9019431).
Permalink Edit Delete 
zxu Zhenzhong Xu added a comment - 07/26/2017 09:24 PM
2017-07-22 02:47:45,015 WARN 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 
0x2579bebfd265054 for server 100.83.64.121/100.83.64.121:2181, unexpected 
error, closing socket connection and attempting reconnect
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366)
at 
netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
Permalink Edit Delete 
zxu Zhenzhong Xu added a comment - 07/26/2017 09:25 PM
2017-07-22 02:47:44,557 ERROR org.apache.kafka.clients.producer.KafkaProducer - 
Interrupted while joining ioThread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at 

[jira] [Created] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback

2017-06-23 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-6998:
---

 Summary: Kafka connector needs to expose metrics for 
failed/successful offset commits in the Kafka Consumer callback
 Key: FLINK-6998
 URL: https://issues.apache.org/jira/browse/FLINK-6998
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Zhenzhong Xu






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-6923) Kafka connector needs to expose information about in-flight record in AbstractFetcher base class

2017-06-14 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-6923:
---

 Summary: Kafka connector needs to expose information about 
in-flight record in AbstractFetcher base class
 Key: FLINK-6923
 URL: https://issues.apache.org/jira/browse/FLINK-6923
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Reporter: Zhenzhong Xu
Priority: Minor


We have a use case where we have our custom Fetcher implementation that uses 
AbstractFetcher base class. We need to periodically get current in flight (in 
processing) records' partition and offset information. 

This can be easily exposed in AbstractFetcher class.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-4820) Slf4j / log4j version upgrade to support dynamic change of log levels.

2016-10-12 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4820:
---

 Summary: Slf4j / log4j version upgrade to support dynamic change 
of log levels.
 Key: FLINK-4820
 URL: https://issues.apache.org/jira/browse/FLINK-4820
 Project: Flink
  Issue Type: Task
Reporter: Zhenzhong Xu






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4819) Checkpoint metadata+data inspection tool (view / update)

2016-10-12 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4819:
---

 Summary: Checkpoint metadata+data inspection tool (view / update)
 Key: FLINK-4819
 URL: https://issues.apache.org/jira/browse/FLINK-4819
 Project: Flink
  Issue Type: Task
  Components: State Backends, Checkpointing
Reporter: Zhenzhong Xu


Checkpoint inspection tool for operationalization, troubleshooting, 
diagnostics, etc, or performing brain surgery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4760) Kafka 09 Consumer failed to initialize state causing job to restart

2016-10-06 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4760:
---

 Summary: Kafka 09 Consumer failed to initialize state causing job 
to restart
 Key: FLINK-4760
 URL: https://issues.apache.org/jira/browse/FLINK-4760
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, State Backends, Checkpointing
Reporter: Zhenzhong Xu
Priority: Critical


java.io.StreamCorruptedException: invalid stream header: 0278
at 
java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806)
at java.io.ObjectInputStream.(ObjectInputStream.java:299)
at 
org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:79)
at 
org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop

2016-09-21 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4660:
---

 Summary: HadoopFileSystem (with S3A) may leak connections, which 
cause job to stuck in a restarting loop
 Key: FLINK-4660
 URL: https://issues.apache.org/jira/browse/FLINK-4660
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Zhenzhong Xu
Priority: Critical






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4545) Flink automatically manages TM network buffer

2016-08-31 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4545:
---

 Summary: Flink automatically manages TM network buffer
 Key: FLINK-4545
 URL: https://issues.apache.org/jira/browse/FLINK-4545
 Project: Flink
  Issue Type: Wish
Reporter: Zhenzhong Xu



Currently, the number of network buffer per task manager is preconfigured and 
the memory is pre-allocated through taskmanager.network.numberOfBuffers config. 
In a Job DAG with shuffle phase, this number can go up very high depends on the 
TM cluster size. The formula for calculating the buffer count is documented 
here 
(https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers).
  

#slots-per-TM^2 * #TMs * 4

In a standalone deployment, we may need to control the task manager cluster 
size dynamically and then leverage the up-coming Flink feature to support 
scaling job parallelism/rescaling at runtime. 
If the buffer count config is static at runtime and cannot be changed without 
restarting task manager process, this may add latency and complexity for 
scaling process. I am wondering if there is already any discussion around 
whether the network buffer should be automatically managed by Flink or at least 
expose some API to allow it to be reconfigured. Let me know if there is any 
existing JIRA that I should follow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4372) Add ability to take savepoints from job manager web UI

2016-08-10 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4372:
---

 Summary: Add ability to take savepoints from job manager web UI
 Key: FLINK-4372
 URL: https://issues.apache.org/jira/browse/FLINK-4372
 Project: Flink
  Issue Type: Sub-task
Reporter: Zhenzhong Xu


subtask of FLINK-4336



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4371) Add ability to take savepoints from job manager RESTful API

2016-08-10 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4371:
---

 Summary: Add ability to take savepoints from job manager RESTful 
API
 Key: FLINK-4371
 URL: https://issues.apache.org/jira/browse/FLINK-4371
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing, Webfrontend
Reporter: Zhenzhong Xu


subtask of FLINK-4336



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4336) Expose ability to take a savepoint from job manager rest api

2016-08-08 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4336:
---

 Summary: Expose ability to take a savepoint from job manager rest 
api
 Key: FLINK-4336
 URL: https://issues.apache.org/jira/browse/FLINK-4336
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Zhenzhong Xu
Priority: Minor


There is a need to interact with job manager rest api to manage savepoint 
snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4335) Add jar id, and job parameters information to job status rest call

2016-08-08 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4335:
---

 Summary: Add jar id, and job parameters information to job status 
rest call
 Key: FLINK-4335
 URL: https://issues.apache.org/jira/browse/FLINK-4335
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Zhenzhong Xu
Priority: Minor


>From declarative, reconcilation based job management perspective, there is a 
>need to identify the job jar id, and all job parameters for a running job to 
>determine if the current job is up to date. 

I think these information needs to be available through the job manager rest 
call (/jobs/$id).





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4308) Allow uploaded jar directory to be configurable

2016-08-02 Thread Zhenzhong Xu (JIRA)
Zhenzhong Xu created FLINK-4308:
---

 Summary: Allow uploaded jar directory to be configurable 
 Key: FLINK-4308
 URL: https://issues.apache.org/jira/browse/FLINK-4308
 Project: Flink
  Issue Type: Improvement
  Components: Webfrontend
Reporter: Zhenzhong Xu
Priority: Minor


I notice sometimes it's preferable to have uploaded jars to be put into a 
configurable directory location instead only have it at runtime.

WebRuntimeMonitor.java

String uploadDirName = "flink-web-upload-" + UUID.randomUUID();
this.uploadDir = new File(getBaseDir(config), uploadDirName);




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)