[jira] [Created] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend
Sayat Satybaldiyev created FLINK-11171: -- Summary: Unexpected timestamp deserialization failure in RocksDB state backend Key: FLINK-11171 URL: https://issues.apache.org/jira/browse/FLINK-11171 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.7.0 Reporter: Sayat Satybaldiyev We have a job that joins two data stream via Process function and using ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint due to timestamp serialization error. TTL state config {code:java} StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.hours(recommendationRetentionHr)) .neverReturnExpired() .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .cleanupFullSnapshot() .build(); {code} Error {code:java} 2018-12-16 06:02:12,609 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631. 2018-12-16 06:22:12,609 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 of job 7825029dc256542aa312c0b68ecf0631 expired before completing. 2018-12-16 06:22:12,637 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631. 2018-12-16 06:22:12,899 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 7825029dc256542aa312c0b68ecf0631. 2018-12-16 06:22:12,900 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631. java.lang.Exception: Could not materialize checkpoint 32 for operator joined-stream (1/6). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp deserialization failure at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp deserialization failure at org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94) at org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79) at org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70) at org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48) at org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128) at org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223) at org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
[jira] [Created] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream
Sayat Satybaldiyev created FLINK-10380: -- Summary: Check if key is not nul before assign to group in KeyedStream Key: FLINK-10380 URL: https://issues.apache.org/jira/browse/FLINK-10380 Project: Flink Issue Type: Task Affects Versions: 1.6.0 Reporter: Sayat Satybaldiyev If a user creates a KeyedStream and partition by key which might be null, Flink job throws NullPointerExceptoin at runtime. However, NPE that Flink throws hard to debug and understand as it doesn't refer to place in Flink job. *Suggestion:* Add precondition that checks if the key is not null and throw a descriptive error if it's a null. *Job Example*: {code:java} DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb")) .map(x -> (String)null) .keyBy(x -> x);{code} An error that is thrown: {code:java} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61) Caused by: java.lang.RuntimeException at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC service. at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59) at org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63) at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) {code} ... 10 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10287) Flink HA Persist Cancelled Job in Zookeeper
Sayat Satybaldiyev created FLINK-10287: -- Summary: Flink HA Persist Cancelled Job in Zookeeper Key: FLINK-10287 URL: https://issues.apache.org/jira/browse/FLINK-10287 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.0 Reporter: Sayat Satybaldiyev Attachments: Screenshot from 2018-09-05 16-48-34.png Flink HA persisted canceled job in Zookeeper, which makes HA mode quite fragile. In case JM get restarted, it tries to recover canceled job and after some time fails completely being not able to recover it. How to reproduce: # Have Flink HA 1.6 cluster # Cancel a running flink job # Observe that flink didn't remove ZK metadata. !Screenshot from 2018-09-05 16-48-34.png! {code:java} ls /flink/flink_ns/jobgraphs/46d8d3555936c0d8e6b6ec21cc02bb11 [7f392fd9-cedc-4978-9186-1f54b98eeeb7]{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10286) Flink Persist Invalid Job Graph in Zookeeper
Sayat Satybaldiyev created FLINK-10286: -- Summary: Flink Persist Invalid Job Graph in Zookeeper Key: FLINK-10286 URL: https://issues.apache.org/jira/browse/FLINK-10286 Project: Flink Issue Type: Bug Components: Core Affects Versions: 1.6.0 Reporter: Sayat Satybaldiyev In HA mode Flink 1.6, Flink persist job graph in Zookpeer even if the job was not accepted by Job Manager. This particularly bad as later if JM dies and restarts JM tries to recover the job and obviously fails and dies completely. How to reproduce: 1. Have HA Flink cluster 1.6 2. Submit invalid job, in my case I'm put invalid file schema for rocksdb state backed ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.enableCheckpointing(5000); RocksDBStateBackend backend = new RocksDBStateBackend("hddd:///tmp/flink/rocksdb"); backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED); env.setStateBackend(backend); ``` Client returns: ``` The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: Could not submit job (JobID: 9680f02ae2f3806c3b4da25bfacd0749) ``` JM does not accept job, this truncated error log from JM: ``` Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. ... 24 more Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: Could not find a file system implementation for scheme 'hddd'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. ``` 4. Go to ZK and observe that JM has saved job to ZK ls /flink/flink_ns/jobgraphs/9680f02ae2f3806c3b4da25bfacd0749 [7f392fd9-cedc-4978-9186-1f54b98eeeb7] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9831) Too many open files for RocksDB
Sayat Satybaldiyev created FLINK-9831: - Summary: Too many open files for RocksDB Key: FLINK-9831 URL: https://issues.apache.org/jira/browse/FLINK-9831 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.5.0 Reporter: Sayat Satybaldiyev Attachments: flink_open_files.txt While running only one Flink job, which is backed by RocksDB with checkpoining to HDFS we encounter an exception that TM cannot access the SST file because the process has too many open files. However, we have already increased the file soft/hard limit on the machine. Number open files for TM on the machine: {code:java} lsof -p 23301|wc -l 8241{code} Instance limits {code:java} ulimit -a core file size (blocks, -c) 0 data seg size (kbytes, -d) unlimited scheduling priority (-e) 0 file size (blocks, -f) unlimited pending signals (-i) 256726 max locked memory (kbytes, -l) 64 max memory size (kbytes, -m) unlimited open files (-n) 1048576 pipe size (512 bytes, -p) 8 POSIX message queues (bytes, -q) 819200 real-time priority (-r) 0 stack size (kbytes, -s) 8192 cpu time (seconds, -t) unlimited max user processes (-u) 128000 virtual memory (kbytes, -v) unlimited file locks (-x) unlimited {code} [^flink_open_files.txt] java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132) ... 5 more Caused by: java.io.FileNotFoundException: /tmp/flink-io-3da06c9e-f619-44c9-b95f-54ee9b1a084a/job_b3ecbdc0eb2dc2dfbf5532ec1fcef9da_op_KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1__1_1__uuid_c4b82a7e-8a04-4704-9e0b-393c3243cef2/3701639a-bacd-4861-99d8-5f3d112e88d6/16.sst (Too many open files) at java.io.FileOutputStream.open0(Native Method) at java.io.FileOutputStream.open(FileOutputStream.java:270) at java.io.FileOutputStream.(FileOutputStream.java:213) at java.io.FileOutputStream.(FileOutputStream.java:162) at org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:47) at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:275) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1008) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:973) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) ... 7 more -- This message was sent by Atlassian JIRA (v
[jira] [Created] (FLINK-9805) HTTP Redirect to Active JM in Flink CLI
Sayat Satybaldiyev created FLINK-9805: - Summary: HTTP Redirect to Active JM in Flink CLI Key: FLINK-9805 URL: https://issues.apache.org/jira/browse/FLINK-9805 Project: Flink Issue Type: Improvement Components: Client Affects Versions: 1.5.0 Reporter: Sayat Satybaldiyev Flink CLI allows specifying job manager address via --jobmanager flag. However, in HA mode the JM can change and then standby JM does HTTP redirect to the active one. However, during deployment via flink CLI with --jobmanager flag option the CLI does not redirect to the active one. Thus fails to submit job with "Could not complete the operation. Number of retries has been exhausted" *Proposal:* Honor JM HTTP redirect in case leadership changes in flink CLI with --jobmanager flag active. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9711) Flink CLI does not filter RUNNING only jobs
Sayat Satybaldiyev created FLINK-9711: - Summary: Flink CLI does not filter RUNNING only jobs Key: FLINK-9711 URL: https://issues.apache.org/jira/browse/FLINK-9711 Project: Flink Issue Type: Bug Components: Client Affects Versions: 1.5.0 Reporter: Sayat Satybaldiyev In Flink CLI there's a command list with option --running that according to descriptions "Show only running programs and their JobIDs". However, in practice, it also shows jobs that are in the *CANCELED* state, which is a completed job. {code:java} flink list --running -m job-manager:8081 Waiting for response... -- Running/Restarting Jobs --- 03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched TrackingClick (CANCELED) 03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched TrackingClick (CANCELED) 03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched TrackingClick (RUNNING) -- {code} Proposal it to extend CLI program to show jobs only in the *RUNNING* state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9705) Failed to close kafka producer - Interrupted while joining ioThread
Sayat Satybaldiyev created FLINK-9705: - Summary: Failed to close kafka producer - Interrupted while joining ioThread Key: FLINK-9705 URL: https://issues.apache.org/jira/browse/FLINK-9705 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.5.0 Reporter: Sayat Satybaldiyev While running Flink 1.5.0 with Kafka sink, I got following errors from Flink streaming connector. {code:java} 18:05:09,270 ERROR org.apache.kafka.clients.producer.KafkaProducer - Interrupted while joining ioThread java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 18:05:09,271 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error during disposal of stream operator. org.apache.kafka.common.KafkaException: Failed to close kafka producer at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319) at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1260) at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ... 9 more {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)