[jira] [Created] (FLINK-33765) Flink SQL to support COLLECTLIST
Zhenzhong Xu created FLINK-33765: Summary: Flink SQL to support COLLECTLIST Key: FLINK-33765 URL: https://issues.apache.org/jira/browse/FLINK-33765 Project: Flink Issue Type: Improvement Components: API / DataSet Reporter: Zhenzhong Xu Flink SQL currently supports COLLECT, which returns a multiset, however, given support for casting from multiset to other types (especially array/list) is *very* limited, see [here,|https://github.com/apache/flink/blob/master/docs/content/docs/dev/table/types.md#casting] this is creating lots of headaches for ease of use. Can we support COLLECT_LIST as a built-in system function? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-7894) Improve metrics around fine-grained recovery and associated checkpointing behaviors
Zhenzhong Xu created FLINK-7894: --- Summary: Improve metrics around fine-grained recovery and associated checkpointing behaviors Key: FLINK-7894 URL: https://issues.apache.org/jira/browse/FLINK-7894 Project: Flink Issue Type: Improvement Affects Versions: 1.3.2, 1.4.0 Reporter: Zhenzhong Xu Currently, the only metric around fine-grained recovery is "task_failures". It's a very high level metric, it would be nice to have the following improvements: * Allows slice and dice into which tasks were restarted. * Recovery duration. * Recovery associated checkpoint behaviors: cancels, failures, etc -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
Zhenzhong Xu created FLINK-7844: --- Summary: Fine Grained Recovery triggers checkpoint timeout failure Key: FLINK-7844 URL: https://issues.apache.org/jira/browse/FLINK-7844 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.3.2 Reporter: Zhenzhong Xu Context: We are using "individual" failover (fine-grained) recovery strategy for our embarrassingly parallel router use case. The topic has over 2000 partitions, and parallelism is set to ~180 that dispatched to over 20 task managers with around 180 slots. We've noticed after one task manager termination, even though the individual recovery happens correctly, that the workload was re-dispatched to a new available task manager instance. However, the checkpoint would take 10 mins to eventually timeout, causing all other task managers not able to commit checkpoints. In a worst-case scenario, if job got restarted for other reasons (i.e. job manager termination), that would cause more messages to be re-processed/duplicates compared to the job without fine-grained recovery enabled. I am suspecting that uber checkpoint was waiting for a previous checkpoint that initiated by the old task manager and thus taking a long time to time out. Two questions: 1. Is there a configuration that controls this checkpoint timeout? 2. Is there any reason that when Job Manager realizes that Task Manager is gone and workload is redispatched, it still need to wait for the checkpoint initiated by the old task manager? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7278) Flink job can stuck while ZK leader reelected during ZK cluster migration
Zhenzhong Xu created FLINK-7278: --- Summary: Flink job can stuck while ZK leader reelected during ZK cluster migration Key: FLINK-7278 URL: https://issues.apache.org/jira/browse/FLINK-7278 Project: Flink Issue Type: Bug Components: Distributed Coordination Reporter: Zhenzhong Xu Priority: Minor We have observed an potential failure case while Flink job was running during ZK migration. Below describes the scenario. 1. Flink cluster running with standalone mode on Netfilx Titus container runtime 2. We performed a ZK migration by updating new OS image one node at a time. 3. During ZK leader reelection, Flink cluster starts to exhibit failures and eventually end in a non-recoverable failure mode. 4. This behavior does not repro every time, may be caused by an edge race condition. Below is a list of error messages ordered by event time: 017-07-22 02:47:44,535 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Source -> Sink: Sink (67/176) (0442d63c89809ad86f38874c845ba83f) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: ResourceID {resourceId='f519795dfabcecfd7863ed587efdb398'} @ titus-123072-worker-3-39 (dataPort=46879) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterAllTaskManagers(InstanceManager.java:234) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:330) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-07-22 02:47:44,621 WARN com.netflix.spaas.runtime.FlinkJobManager - Discard message LeaderSessionMessage(7a247ad9-531b-4f27-877b-df41f9019431,Disconnect(0b300c04592b19750678259cd09fea95,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because the expected leader session ID None did not equal the received leader session ID Some(7a247ad9-531b-4f27-877b-df41f9019431). Permalink Edit Delete zxu Zhenzhong Xu added a comment - 07/26/2017 09:24 PM 2017-07-22 02:47:45,015 WARN netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn - Session 0x2579bebfd265054 for server 100.83.64.121/100.83.64.121:2181, unexpected error, closing socket connection and attempting reconnect java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:68) at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:366) at netflix.spaas.shaded.org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081) Permalink Edit Delete zxu Zhenzhong Xu added a comment - 07/26/2017 09:25 PM 2017-07-22 02:47:44,557 ERROR org.apache.kafka.clients.producer.KafkaProducer - Interrupted while joining ioThread java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at
[jira] [Created] (FLINK-6998) Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback
Zhenzhong Xu created FLINK-6998: --- Summary: Kafka connector needs to expose metrics for failed/successful offset commits in the Kafka Consumer callback Key: FLINK-6998 URL: https://issues.apache.org/jira/browse/FLINK-6998 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Zhenzhong Xu -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6923) Kafka connector needs to expose information about in-flight record in AbstractFetcher base class
Zhenzhong Xu created FLINK-6923: --- Summary: Kafka connector needs to expose information about in-flight record in AbstractFetcher base class Key: FLINK-6923 URL: https://issues.apache.org/jira/browse/FLINK-6923 Project: Flink Issue Type: Improvement Components: Kafka Connector Reporter: Zhenzhong Xu Priority: Minor We have a use case where we have our custom Fetcher implementation that uses AbstractFetcher base class. We need to periodically get current in flight (in processing) records' partition and offset information. This can be easily exposed in AbstractFetcher class. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-4820) Slf4j / log4j version upgrade to support dynamic change of log levels.
Zhenzhong Xu created FLINK-4820: --- Summary: Slf4j / log4j version upgrade to support dynamic change of log levels. Key: FLINK-4820 URL: https://issues.apache.org/jira/browse/FLINK-4820 Project: Flink Issue Type: Task Reporter: Zhenzhong Xu -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4819) Checkpoint metadata+data inspection tool (view / update)
Zhenzhong Xu created FLINK-4819: --- Summary: Checkpoint metadata+data inspection tool (view / update) Key: FLINK-4819 URL: https://issues.apache.org/jira/browse/FLINK-4819 Project: Flink Issue Type: Task Components: State Backends, Checkpointing Reporter: Zhenzhong Xu Checkpoint inspection tool for operationalization, troubleshooting, diagnostics, etc, or performing brain surgery. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4760) Kafka 09 Consumer failed to initialize state causing job to restart
Zhenzhong Xu created FLINK-4760: --- Summary: Kafka 09 Consumer failed to initialize state causing job to restart Key: FLINK-4760 URL: https://issues.apache.org/jira/browse/FLINK-4760 Project: Flink Issue Type: Bug Components: Kafka Connector, State Backends, Checkpointing Reporter: Zhenzhong Xu Priority: Critical java.io.StreamCorruptedException: invalid stream header: 0278 at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:806) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:79) at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4660) HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop
Zhenzhong Xu created FLINK-4660: --- Summary: HadoopFileSystem (with S3A) may leak connections, which cause job to stuck in a restarting loop Key: FLINK-4660 URL: https://issues.apache.org/jira/browse/FLINK-4660 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Zhenzhong Xu Priority: Critical -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4545) Flink automatically manages TM network buffer
Zhenzhong Xu created FLINK-4545: --- Summary: Flink automatically manages TM network buffer Key: FLINK-4545 URL: https://issues.apache.org/jira/browse/FLINK-4545 Project: Flink Issue Type: Wish Reporter: Zhenzhong Xu Currently, the number of network buffer per task manager is preconfigured and the memory is pre-allocated through taskmanager.network.numberOfBuffers config. In a Job DAG with shuffle phase, this number can go up very high depends on the TM cluster size. The formula for calculating the buffer count is documented here (https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers). #slots-per-TM^2 * #TMs * 4 In a standalone deployment, we may need to control the task manager cluster size dynamically and then leverage the up-coming Flink feature to support scaling job parallelism/rescaling at runtime. If the buffer count config is static at runtime and cannot be changed without restarting task manager process, this may add latency and complexity for scaling process. I am wondering if there is already any discussion around whether the network buffer should be automatically managed by Flink or at least expose some API to allow it to be reconfigured. Let me know if there is any existing JIRA that I should follow. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4372) Add ability to take savepoints from job manager web UI
Zhenzhong Xu created FLINK-4372: --- Summary: Add ability to take savepoints from job manager web UI Key: FLINK-4372 URL: https://issues.apache.org/jira/browse/FLINK-4372 Project: Flink Issue Type: Sub-task Reporter: Zhenzhong Xu subtask of FLINK-4336 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4371) Add ability to take savepoints from job manager RESTful API
Zhenzhong Xu created FLINK-4371: --- Summary: Add ability to take savepoints from job manager RESTful API Key: FLINK-4371 URL: https://issues.apache.org/jira/browse/FLINK-4371 Project: Flink Issue Type: Sub-task Components: State Backends, Checkpointing, Webfrontend Reporter: Zhenzhong Xu subtask of FLINK-4336 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4336) Expose ability to take a savepoint from job manager rest api
Zhenzhong Xu created FLINK-4336: --- Summary: Expose ability to take a savepoint from job manager rest api Key: FLINK-4336 URL: https://issues.apache.org/jira/browse/FLINK-4336 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: Zhenzhong Xu Priority: Minor There is a need to interact with job manager rest api to manage savepoint snapshots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4335) Add jar id, and job parameters information to job status rest call
Zhenzhong Xu created FLINK-4335: --- Summary: Add jar id, and job parameters information to job status rest call Key: FLINK-4335 URL: https://issues.apache.org/jira/browse/FLINK-4335 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: Zhenzhong Xu Priority: Minor >From declarative, reconcilation based job management perspective, there is a >need to identify the job jar id, and all job parameters for a running job to >determine if the current job is up to date. I think these information needs to be available through the job manager rest call (/jobs/$id). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4308) Allow uploaded jar directory to be configurable
Zhenzhong Xu created FLINK-4308: --- Summary: Allow uploaded jar directory to be configurable Key: FLINK-4308 URL: https://issues.apache.org/jira/browse/FLINK-4308 Project: Flink Issue Type: Improvement Components: Webfrontend Reporter: Zhenzhong Xu Priority: Minor I notice sometimes it's preferable to have uploaded jars to be put into a configurable directory location instead only have it at runtime. WebRuntimeMonitor.java String uploadDirName = "flink-web-upload-" + UUID.randomUUID(); this.uploadDir = new File(getBaseDir(config), uploadDirName); -- This message was sent by Atlassian JIRA (v6.3.4#6332)