[jira] [Created] (FLINK-11171) Unexpected timestamp deserialization failure in RocksDB state backend

2018-12-16 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-11171:
--

 Summary: Unexpected timestamp deserialization failure in RocksDB 
state backend
 Key: FLINK-11171
 URL: https://issues.apache.org/jira/browse/FLINK-11171
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.7.0
Reporter: Sayat Satybaldiyev


We have a job that joins two data stream via Process function and using 
ValueState TTL with RocksDB backends. The jobs constantly fail to checkpoint 
due to timestamp serialization error.

TTL state config

{code:java}
StateTtlConfig ttlConfig = StateTtlConfig
 .newBuilder(Time.hours(recommendationRetentionHr))
 .neverReturnExpired()
 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
 .cleanupFullSnapshot()
 .build();

{code}



 

Error

 


{code:java}


2018-12-16 06:02:12,609 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 31 @ 1544940132568 for job 7825029dc256542aa312c0b68ecf0631.
 2018-12-16 06:22:12,609 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 31 
of job 7825029dc256542aa312c0b68ecf0631 expired before completing.
 2018-12-16 06:22:12,637 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering 
checkpoint 32 @ 1544941332609 for job 7825029dc256542aa312c0b68ecf0631.
 2018-12-16 06:22:12,899 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Decline 
checkpoint 32 by task 176c8b3c3ff190d183415ab77b89344c of job 
7825029dc256542aa312c0b68ecf0631.
 2018-12-16 06:22:12,900 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding 
checkpoint 32 of job 7825029dc256542aa312c0b68ecf0631.
 java.lang.Exception: Could not materialize checkpoint 32 for operator 
joined-stream (1/6).
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
deserialization failure
 at java.util.concurrent.FutureTask.report(FutureTask.java:122)
 at java.util.concurrent.FutureTask.get(FutureTask.java:192)
 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
 at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:47)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
 ... 5 more
 Caused by: org.apache.flink.util.FlinkRuntimeException: Unexpected timestamp 
deserialization failure
 at 
org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:94)
 at 
org.apache.flink.runtime.state.ttl.TtlStateSnapshotTransformer$TtlSerializedValueStateSnapshotTransformer.filterOrTransform(TtlStateSnapshotTransformer.java:79)
 at 
org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.filterOrTransform(RocksTransformingIteratorWrapper.java:70)
 at 
org.apache.flink.contrib.streaming.state.iterator.RocksTransformingIteratorWrapper.seekToFirst(RocksTransformingIteratorWrapper.java:48)
 at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.buildIteratorHeap(RocksStatesPerKeyGroupMergeIterator.java:128)
 at 
org.apache.flink.contrib.streaming.state.iterator.RocksStatesPerKeyGroupMergeIterator.(RocksStatesPerKeyGroupMergeIterator.java:68)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:312)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
 at 
org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
 at 
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)

[jira] [Created] (FLINK-10380) Check if key is not nul before assign to group in KeyedStream

2018-09-20 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-10380:
--

 Summary: Check if key is not nul before assign to group in 
KeyedStream
 Key: FLINK-10380
 URL: https://issues.apache.org/jira/browse/FLINK-10380
 Project: Flink
  Issue Type: Task
Affects Versions: 1.6.0
Reporter: Sayat Satybaldiyev


If a user creates a KeyedStream and partition by key which might be null, Flink 
job throws NullPointerExceptoin at runtime. However, NPE that Flink throws hard 
to debug and understand as it doesn't refer to place in Flink job.

*Suggestion:*

Add precondition that checks if the key is not null and throw a descriptive 
error if it's a null.

 

*Job Example*:

 
{code:java}
DataStream stream = env.fromCollection(Arrays.asList("aaa", "bbb"))
 .map(x -> (String)null)
 .keyBy(x -> x);{code}
 

 

An error that is thrown:

 
{code:java}
Exception in thread "main" 
org.apache.flink.runtime.client.JobExecutionException: 
java.lang.RuntimeException
 at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
 at 
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
 at org.myorg.quickstart.BuggyKeyedStream.main(BuggyKeyedStream.java:61)
Caused by: java.lang.RuntimeException
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)16:26:43,110
 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopped Akka RPC 
service.
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:667)
 at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
 at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignToKeyGroup(KeyGroupRangeAssignment.java:59)
 at 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.assignKeyToParallelOperator(KeyGroupRangeAssignment.java:48)
 at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:63)
 at 
org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannels(KeyGroupStreamPartitioner.java:32)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104)
 at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
{code}

... 10 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10287) Flink HA Persist Cancelled Job in Zookeeper

2018-09-05 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-10287:
--

 Summary: Flink HA Persist Cancelled Job in Zookeeper
 Key: FLINK-10287
 URL: https://issues.apache.org/jira/browse/FLINK-10287
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.6.0
Reporter: Sayat Satybaldiyev
 Attachments: Screenshot from 2018-09-05 16-48-34.png

Flink HA persisted canceled job in Zookeeper, which makes HA mode quite 
fragile. In case JM get restarted, it tries to recover canceled job and after 
some time fails completely being not able to recover it. 

 

How to reproduce:
 # Have Flink HA 1.6 cluster
 # Cancel a running flink job
 # Observe that flink didn't remove ZK metadata.

!Screenshot from 2018-09-05 16-48-34.png!
{code:java}
ls /flink/flink_ns/jobgraphs/46d8d3555936c0d8e6b6ec21cc02bb11
[7f392fd9-cedc-4978-9186-1f54b98eeeb7]{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10286) Flink Persist Invalid Job Graph in Zookeeper

2018-09-05 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-10286:
--

 Summary: Flink Persist Invalid Job Graph in Zookeeper
 Key: FLINK-10286
 URL: https://issues.apache.org/jira/browse/FLINK-10286
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.6.0
Reporter: Sayat Satybaldiyev


In HA mode Flink 1.6, Flink persist job graph in Zookpeer even if the job was 
not accepted by Job Manager. This particularly bad as later if JM dies and 
restarts JM tries to recover the job and obviously fails and dies completely.

 

How to reproduce:

1. Have HA Flink cluster 1.6

2. Submit invalid job, in my case I'm put invalid file schema for rocksdb state 
backed

```

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.enableCheckpointing(5000);
RocksDBStateBackend backend = new 
RocksDBStateBackend("hddd:///tmp/flink/rocksdb");

backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
env.setStateBackend(backend);

```

Client returns:

```

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not submit 
job (JobID: 9680f02ae2f3806c3b4da25bfacd0749)

```

JM does not accept job, this truncated error log from JM:

```

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit job.
... 24 more
Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobExecutionException: Could not set up 
JobManager

 

Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: 
Could not find a file system implementation for scheme 'hddd'. The scheme is 
not directly supported by Flink and no Hadoop file system to support this 
scheme could be loaded.

 

```

4. Go to ZK and observe that JM has saved job to ZK

ls /flink/flink_ns/jobgraphs/9680f02ae2f3806c3b4da25bfacd0749
[7f392fd9-cedc-4978-9186-1f54b98eeeb7]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9831) Too many open files for RocksDB

2018-07-12 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-9831:
-

 Summary: Too many open files for RocksDB
 Key: FLINK-9831
 URL: https://issues.apache.org/jira/browse/FLINK-9831
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sayat Satybaldiyev
 Attachments: flink_open_files.txt

While running only one Flink job, which is backed by RocksDB with checkpoining 
to HDFS we encounter an exception that TM cannot access the SST file because 
the process has too many open files. However, we have already increased the 
file soft/hard limit on the machine.

Number open files for TM on the machine:

 
{code:java}
lsof -p 23301|wc -l
8241{code}
 

Instance limits

 
{code:java}
ulimit -a
core file size (blocks, -c) 0
data seg size (kbytes, -d) unlimited
scheduling priority (-e) 0
file size (blocks, -f) unlimited
pending signals (-i) 256726
max locked memory (kbytes, -l) 64
max memory size (kbytes, -m) unlimited
open files (-n) 1048576
pipe size (512 bytes, -p) 8
POSIX message queues (bytes, -q) 819200
real-time priority (-r) 0
stack size (kbytes, -s) 8192
cpu time (seconds, -t) unlimited
max user processes (-u) 128000
virtual memory (kbytes, -v) unlimited
file locks (-x) unlimited
 
{code}
 

[^flink_open_files.txt]
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1_(1/1) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:276)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:132)
... 5 more
Caused by: java.io.FileNotFoundException: 
/tmp/flink-io-3da06c9e-f619-44c9-b95f-54ee9b1a084a/job_b3ecbdc0eb2dc2dfbf5532ec1fcef9da_op_KeyedCoProcessOperator_98a16ed3228ec4a08acd8d78420516a1__1_1__uuid_c4b82a7e-8a04-4704-9e0b-393c3243cef2/3701639a-bacd-4861-99d8-5f3d112e88d6/16.sst
 (Too many open files)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at java.io.FileOutputStream.(FileOutputStream.java:162)
at 
org.apache.flink.core.fs.local.LocalDataOutputStream.(LocalDataOutputStream.java:47)
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:275)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.copyStateDataHandleData(RocksDBKeyedStateBackend.java:1008)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllDataFromStateHandles(RocksDBKeyedStateBackend.java:988)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.transferAllStateDataToDirectory(RocksDBKeyedStateBackend.java:973)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restoreInstance(RocksDBKeyedStateBackend.java:758)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBIncrementalRestoreOperation.restore(RocksDBKeyedStateBackend.java:732)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:443)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:149)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
... 7 more



--
This message was sent by Atlassian JIRA
(v

[jira] [Created] (FLINK-9805) HTTP Redirect to Active JM in Flink CLI

2018-07-11 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-9805:
-

 Summary: HTTP Redirect to Active JM in Flink CLI
 Key: FLINK-9805
 URL: https://issues.apache.org/jira/browse/FLINK-9805
 Project: Flink
  Issue Type: Improvement
  Components: Client
Affects Versions: 1.5.0
Reporter: Sayat Satybaldiyev


Flink CLI allows specifying job manager address via --jobmanager flag. However, 
in HA mode the JM can change and then standby JM does HTTP redirect to the 
active one. However, during deployment via flink CLI with --jobmanager flag 
option the CLI does not redirect to the active one. Thus fails to submit job 
with "Could not complete the operation. Number of retries has been exhausted" 

 

*Proposal:*

Honor JM HTTP redirect in case leadership changes in flink CLI with 
--jobmanager flag active. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9711) Flink CLI does not filter RUNNING only jobs

2018-07-03 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-9711:
-

 Summary: Flink CLI does not filter RUNNING only jobs
 Key: FLINK-9711
 URL: https://issues.apache.org/jira/browse/FLINK-9711
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.5.0
Reporter: Sayat Satybaldiyev


In Flink CLI there's a command list with option --running that according to 
descriptions "Show only running programs and their JobIDs". However, in 
practice, it also shows jobs that are in the *CANCELED* state, which is a 
completed job.

 
{code:java}
flink list --running -m job-manager:8081 
Waiting for response...
-- Running/Restarting Jobs ---
03.07.2018 10:29:34 : 6e49027e843ced2ad798da549004243e : Enriched TrackingClick 
(CANCELED)
03.07.2018 10:42:31 : c901ae58787ba6aea4a46d6bb9dc2b3c : Enriched TrackingClick 
(CANCELED)
03.07.2018 11:27:51 : 83ab149ad528cfd956da7090543cbc72 : Enriched TrackingClick 
(RUNNING)
--

{code}
 

Proposal it to extend CLI program to show jobs only in the *RUNNING* state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9705) Failed to close kafka producer - Interrupted while joining ioThread

2018-07-02 Thread Sayat Satybaldiyev (JIRA)
Sayat Satybaldiyev created FLINK-9705:
-

 Summary: Failed to close kafka producer - Interrupted while 
joining ioThread
 Key: FLINK-9705
 URL: https://issues.apache.org/jira/browse/FLINK-9705
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.5.0
Reporter: Sayat Satybaldiyev


While running Flink 1.5.0 with Kafka sink, I got following errors from Flink 
streaming connector.

 
{code:java}
18:05:09,270 ERROR org.apache.kafka.clients.producer.KafkaProducer - 
Interrupted while joining ioThread
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
18:05:09,271 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Error 
during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010)
at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:319)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:473)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:374)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1260)
at 
org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031)
... 9 more
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)