[jira] [Created] (FLINK-12024) Bump universal Kafka connector to Kafka dependency to 2.2.0
Elias Levy created FLINK-12024: -- Summary: Bump universal Kafka connector to Kafka dependency to 2.2.0 Key: FLINK-12024 URL: https://issues.apache.org/jira/browse/FLINK-12024 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.7.2 Reporter: Elias Levy Update the Kafka client dependency to version 2.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11794) Allow compression of row format files created by StreamingFileSink
Elias Levy created FLINK-11794: -- Summary: Allow compression of row format files created by StreamingFileSink Key: FLINK-11794 URL: https://issues.apache.org/jira/browse/FLINK-11794 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.7.2 Reporter: Elias Levy Currently, there is no mechanism to compress files created using a StreamingFileSink. This is highly desirable when output is a text based row format such as JSON. Possible alternatives are the introduction of a callback that gets passed the local file before it is uploaded to the DFS, so that it could be compressed; or a factory method could be used that returns an OutputStream, such as GZIPOutputStream, that compresses a passed in output stream that could be then used by the Encoder. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11520) Triggers should be provided the window state
Elias Levy created FLINK-11520: -- Summary: Triggers should be provided the window state Key: FLINK-11520 URL: https://issues.apache.org/jira/browse/FLINK-11520 Project: Flink Issue Type: Improvement Reporter: Elias Levy Some triggers may require access to the window state to perform their job. Consider a window computing a count using an aggregate function. It may be desired to fire the window when the count is 1 and then at the end of the window. The early firing can provide feedback to external systems that a key has been observed, while waiting for the final count. The same problem can be observed in org.apache.flink.streaming.api.windowing.triggers.CountTrigger, which must maintain an internal count instead of being able to make use of the window state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11517) Inefficient window state access when using RocksDB state backend
Elias Levy created FLINK-11517: -- Summary: Inefficient window state access when using RocksDB state backend Key: FLINK-11517 URL: https://issues.apache.org/jira/browse/FLINK-11517 Project: Flink Issue Type: Bug Reporter: Elias Levy When using an aggregate function on a window with a process function and the RocksDB state backend, state access is inefficient. The WindowOperator calls windowState.add to merge the new element using the aggregate function. The add method of RocksDBAggregatingState will read the state, deserialize the state, call the aggregate function, deserialize the state, and write it out. If the trigger decides the window must be fired, as the the windowState.add does not return the state, the WindowOperator must call windowState.get to get it and pass it to the window process function, resulting in another read and deserialization. Finally, while the state is not passed in to the trigger, in some cases the trigger may have a need to access the state. That is our case. As the state is not passed to the trigger, we must read and deserialize the state one more from within the trigger. Thus, state must be read and deserialized three times to process a single element. If the state is large, this can be quite costly. Ideally windowState.add would return the state, so that the WindowOperator can pass it to the process function without having to read it again. Additionally, the state would be made available to the trigger to enable more use cases without having to go through the state descriptor again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout
Elias Levy created FLINK-10617: -- Summary: Restoring job fails because of slot allocation timeout Key: FLINK-10617 URL: https://issues.apache.org/jira/browse/FLINK-10617 Project: Flink Issue Type: Bug Components: ResourceManager, TaskManager Affects Versions: 1.6.1 Reporter: Elias Levy The following may be related to FLINK-9932, but I am unsure. If you believe it is, go ahead and close this issue and a duplicate. While trying to test local state recovery on a job with large state, the job failed to be restored because slot allocation timed out. The job is running on a standalone cluster with 12 nodes and 96 task slots (8 per node). The job has parallelism of 96, so it consumes all of the slots, and has ~200 GB of state in RocksDB. To test local state recovery I decided to kill one of the TMs. The TM immediately restarted and re-registered with the JM. I confirmed the JM showed 96 registered task slots. {noformat} 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager attempt 1 (timeout=100ms) 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Successful registration at resource manager akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 302988dea6afbd613bb2f96429b65d18. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,668 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms) 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,688 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,688 INFO
[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified
Elias Levy created FLINK-10520: -- Summary: Job save points REST API fails unless parameters are specified Key: FLINK-10520 URL: https://issues.apache.org/jira/browse/FLINK-10520 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.6.1 Reporter: Elias Levy The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error unless the request includes a body with all parameters ({{target-directory}} and {{cancel-job}})), even thought the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html] suggests these are optional. If a POST request with no data is made, the response is a 400 status code with the error message "Bad request received." If the POST request submits an empty JSON object ( {} ), the response is a 400 status code with the error message "Request did not match expected format SavepointTriggerRequestBody." The same is true if only the {{target-directory}} or {{cancel-job}} parameters are included. As the system is configured with a default savepoint location, there shouldn't be a need to include the parameter in the quest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10493) Macro generated CaseClassSerializer considered harmful
Elias Levy created FLINK-10493: -- Summary: Macro generated CaseClassSerializer considered harmful Key: FLINK-10493 URL: https://issues.apache.org/jira/browse/FLINK-10493 Project: Flink Issue Type: Bug Components: Scala API, State Backends, Checkpointing, Type Serialization System Affects Versions: 1.5.4, 1.6.1, 1.6.0, 1.5.3, 1.5.2, 1.5.1, 1.4.2, 1.4.1, 1.4.0 Reporter: Elias Levy The Flink Scala API uses implicits and macros to generate {{TypeInformation}} and {{TypeSerializer}} objects for types. In the case of Scala tuple and case classes, the macro generates an [anonymous {{CaseClassSerializer}} class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161]. The Scala compiler will generate a name for the anonymous class that depends on the relative position in the code of the macro invocation to other anonymous classes. If the code is changed such that the anonymous class relative position changes, even if the overall logic of the code or the type in question do not change, the name of the serializer class will change. That will result in errors, such as the one below, if the job is restored from a savepoint, as the serializer to read the data in the savepoint will no longer be found, as its name will have changed. At the very least, there should be a prominent warning in the documentation about this issue. Minor code changes can result in jobs that can't restore previous state. Ideally, the use of anonymous classes should be deprecated if possible. {noformat} WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) ... 14 more Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at
[jira] [Created] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled
Elias Levy created FLINK-10483: -- Summary: Can't restore from a savepoint even with Allow Non Restored State enabled Key: FLINK-10483 URL: https://issues.apache.org/jira/browse/FLINK-10483 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Type Serialization System Affects Versions: 1.4.2 Reporter: Elias Levy A trimmed streaming job fails a restore from a savepoint with an Unloadable class for type serializer error, even though the case class in question has been eliminated from the job and Allow Non Restored State is enabled. We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, one of the streams is processed by an async function, and the output of the async function and the other original stream are consumed by a CoProcessOperator, that intern emits Scala case class instances, that go into a stateful ProcessFunction filter, and then into a sink. I.e. {code:java} source 1 -> async function --\ |---> co process --> process --> sink source 2 --/ {code} I eliminated most of the DAG, leaving only the source 1 --> async function portion of it. This removed the case class in question from the processing graph. When I try to restore from the savepoint, even if Allow Non Restored State is selected, the job fails to restore with the error "Deserialization of serializer erroed". This is the error being generated: {noformat} WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) ... 14 more Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at
[jira] [Created] (FLINK-10460) DataDog reporter JsonMappingException
Elias Levy created FLINK-10460: -- Summary: DataDog reporter JsonMappingException Key: FLINK-10460 URL: https://issues.apache.org/jira/browse/FLINK-10460 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.4.2 Reporter: Elias Levy Observed the following error in the TM logs this morning: {code:java} WARN org.apache.flink.metrics.datadog.DatadogHttpReporter - Failed reporting metrics to Datadog. org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: (was java.util.ConcurrentModificationException) (through reference chain: org.apache.flink.metrics.datadog.DSeries["series"]-> java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998) at org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90) at org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79) at org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.runAndReset(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source) at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source) at java.util.AbstractCollection.addAll(Unknown Source) at java.util.HashSet.(Unknown Source) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) at org.apache.flink.metrics.datadog.DGauge.getMetricValue(DGauge.java:42) at
[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning
Elias Levy created FLINK-10390: -- Summary: DataDog metric reporter leak warning Key: FLINK-10390 URL: https://issues.apache.org/jira/browse/FLINK-10390 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.6.1 Reporter: Elias Levy After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings associated with the DataDog metrics reporter: {quote}Sep 21, 2018 9:43:20 PM org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A connection to https://app.datadoghq.com/ was leaked. Did you forget to close a response body? To see where this was allocated, set the OkHttpClient logger level to FINE: Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE); {quote} The metric reporter's okhttp dependency version (3.7.0) has not changed, so that does not appear to be the source of the warning. I believe the issue is the changed made in [FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b]. The HTTP calls were made async. The previous code called {{client.newCall(r).execute().close()}}. The new call does nothing in the callback, even thought the [Callback.onResponse documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-] states: bq. Called when the HTTP response was successfully returned by the remote server. The callback may proceed to read the response body with Response.body. The response is still live until its response body is closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10372) There is no API to configure the timer state backend
Elias Levy created FLINK-10372: -- Summary: There is no API to configure the timer state backend Key: FLINK-10372 URL: https://issues.apache.org/jira/browse/FLINK-10372 Project: Flink Issue Type: Improvement Components: DataStream API, State Backends, Checkpointing Affects Versions: 1.6.0 Reporter: Elias Levy Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB instead of the heap. Alas, this can only be configured via the {{state.backend.rocksdb.timer-service.factory}} config file option. That means that the choice of state backend to use for timer can't be made on a per job basis on a shared cluster. There is a need for an API in {{RocksDBStateBackend}} to configure the backend per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10118) Queryable state MapState entry query
Elias Levy created FLINK-10118: -- Summary: Queryable state MapState entry query Key: FLINK-10118 URL: https://issues.apache.org/jira/browse/FLINK-10118 Project: Flink Issue Type: Improvement Components: Queryable State Affects Versions: 1.6.0 Reporter: Elias Levy Queryable state allows querying of keyed MapState, but such a query returns all MapState entries for the given key. In some cases, such MapState many include substantial number of entries (in the millions), while the user may only be interested in one entry. I propose we allow queries for MapState to provide one or more map entry keys, in addition to the state key, and to only return entries for the given map keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10117) REST API for Queryable State
Elias Levy created FLINK-10117: -- Summary: REST API for Queryable State Key: FLINK-10117 URL: https://issues.apache.org/jira/browse/FLINK-10117 Project: Flink Issue Type: Improvement Components: Queryable State, REST Affects Versions: 1.6.0 Reporter: Elias Levy At the moment, queryable state requires a JVM based client that can make use of the Java queryable state client API in flink-queryable-state-client artifact. In addition, the client requires a state descriptor matching the queried state, which tightly couples the Flink job and query state clients. I propose that queryable state become accessible via a REST API. FLINK-7040 mentions this possibility, but does not specify work towards that goal. I suggest that to enable queryable state over REST, users define JSON serializers via the state descriptors. This would allow queryable state clients to be developed in any language, not require them to use a Flink client library, and permit them to be loosely coupled with the job, as they could generically parse the returned JSON. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10098) Programmatically select timer storage backend
Elias Levy created FLINK-10098: -- Summary: Programmatically select timer storage backend Key: FLINK-10098 URL: https://issues.apache.org/jira/browse/FLINK-10098 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing, Streaming, TaskManager Affects Versions: 1.6.0, 1.7.0 Reporter: Elias Levy FLINK-9486 introduced timer storage on the RocksDB storage backend. Right now it is only possible to configure RocksDB as the storage for timers by settings the {{state.backend.rocksdb.timer-service.factory}} value in the configuration file for Flink. As the state storage backend can be programmatically selected by by jobs via {{env.setStateBackend(...)}}, the timer backend should also be configurable programmatically. Different jobs should be able to store their timers in different storage backends. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10037) Document details event time behavior in a single location
Elias Levy created FLINK-10037: -- Summary: Document details event time behavior in a single location Key: FLINK-10037 URL: https://issues.apache.org/jira/browse/FLINK-10037 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.5.2 Reporter: Elias Levy Assignee: Elias Levy A description of event time and watermarks, how they generated, assigned, and handled, is spread across many pages in the documentation. I would be useful to have it all in a single place and includes missing information, such as how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10011) Old job resurrected during HA failover
Elias Levy created FLINK-10011: -- Summary: Old job resurrected during HA failover Key: FLINK-10011 URL: https://issues.apache.org/jira/browse/FLINK-10011 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.2 Reporter: Elias Levy For the second time we've observed Flink resurrect an old job during JobManager high-availability fail over. h4. Configuration * AWS environment * Flink 1.4.2 standalong cluster in HA mode * 2 JMs, 3 TMs * 3 node ZK ensemble * 1 job consuming to/from Kafka * Checkpoints in S3 using the Presto file system adaptor h4. Timeline * 15:18:10 JM 2 completes checkpoint 69256. * 15:19:10 JM 2 completes checkpoint 69257. * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException * 15:19:57 ZK 1 closes connection to JM 2 (leader) * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1 * 15:19:57 JM 2 reports it can't read data from ZK ** {{Unable to read additional data from server sessionid 0x3003f4a0003, likely server has closed socket, closing socket connection and attempting reconnect)}} ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}} ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not monitored (temporarily).}} ** {{Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the leader election}}{{ }} ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}} * 15:19:57 JM 2 gives up leadership ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}} * 15:19:57 JM 2 changes job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED ** {{Stopping checkpoint coordinator for job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because there is no leader ** {{Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no valid leader id known.}} * 15:19:57 JM 2 connects to ZK 2 and renews its session ** {{Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} ** {{Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.}} ** {{Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 0x3003f4a0003, negotiated timeout = 4}} ** {{Connection to ZooKeeper was reconnected. Leader election can be restarted.}} ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.}} ** {{State change: RECONNECTED}} * 15:19:57: JM 1 reports JM 1 has been granted leadership: ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} * 15:19:57 JM 2 reports the job has been suspended ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.}} ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} * 15:19:57 JM 2 reports it has lost leadership: ** {{Associated JobManager Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} ** {{Received leader address but not running in leader ActorSystem. Cancelling registration.}} * 15:19:57 TMs register with JM 1 * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: ** {{Attempting to recover all jobs.}} ** {{There are 2 jobs to recover. Starting the job recovery.}} ** {{Attempting to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} ** {{Attempting to recover job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: ** {{Got user-level KeeperException when processing sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}} ** {{Got user-level KeeperException when processing sessionid:0x201d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error
[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
Elias Levy created FLINK-9731: - Summary: Kafka source subtask begins to consume from earliest offset Key: FLINK-9731 URL: https://issues.apache.org/jira/browse/FLINK-9731 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Elias Levy On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally. Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs show no indication of anything amiss. There were no errors in the or Kafka related messages in the Flink logs. The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config. Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper. {noformat} [2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32] to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} The broker immediately reconnected to after a few tries ZK: {noformat} [2018-06-30 09:34:55,462] INFO Opening socket connection to server 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,463] INFO Socket connection established to 10.210.48.187/10.210.48.187:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper) [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,468] INFO Opening socket connection to server 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,468] INFO Socket connection established to 10.210.43.200/10.210.43.200:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO Session establishment complete on server 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses: EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) {noformat} By 9:35:02 partitions had returned to the broker. It appears this
[jira] [Created] (FLINK-9682) Add setDescription to execution environment and display it in the UI
Elias Levy created FLINK-9682: - Summary: Add setDescription to execution environment and display it in the UI Key: FLINK-9682 URL: https://issues.apache.org/jira/browse/FLINK-9682 Project: Flink Issue Type: Improvement Components: DataStream API, Webfrontend Affects Versions: 1.5.0 Reporter: Elias Levy Currently you can provide a job name to {{execute}} in the execution environment. In an environment where many version of a job may be executing, such as a development or test environment, identifying which running job is of a specific version via the UI can be difficult unless the version is embedded into the job name given the {{execute}}. But the job name is uses for other purposes, such as for namespacing metrics. Thus, it is not ideal to modify the job name, as that could require modifying metric dashboards and monitors each time versions change. I propose a new method be added to the execution environment, {{setDescription}}, that would allow a user to pass in an arbitrary description that would be displayed in the dashboard, allowing users to distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function
Elias Levy created FLINK-9600: - Summary: Add DataStream transformation variants that pass timestamp to the user function Key: FLINK-9600 URL: https://issues.apache.org/jira/browse/FLINK-9600 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.5.0 Reporter: Elias Levy It is often necessary to access the timestamp assigned to records within user functions. At the moment this is only possible from {{RichFunction}}. Implementing a {{RichFunction}} just to access the timestamp is burdensome, so most job carry a duplicate of the timestamp within the record. It would be useful if {{DataStream}} provided transformation methods that accepted user functions that could be passed the record's timestamp as an additional argument, similar to how there are two variants of {{flatMap}}, one with an extra parameter that gives the user function access to the output {{Collector}}. Along similar lines, it may be useful to have variants that pass the record's key as an additional parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes
Elias Levy created FLINK-9495: - Summary: Implement ResourceManager for Kubernetes Key: FLINK-9495 URL: https://issues.apache.org/jira/browse/FLINK-9495 Project: Flink Issue Type: Improvement Components: ResourceManager Affects Versions: 1.5.0 Reporter: Elias Levy I noticed there is no issue for developing a Kubernetes specific ResourceManager under FLIP-6, so I am creating this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9450) Job hangs if S3 access it denied during checkpoints
Elias Levy created FLINK-9450: - Summary: Job hangs if S3 access it denied during checkpoints Key: FLINK-9450 URL: https://issues.apache.org/jira/browse/FLINK-9450 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Elias Levy We have a streaming job that consumes from and writes to Kafka. The job is configured to checkpoint to S3. If we deny access to S3 by using iptables on the TM host to deny all outgoing connections to ports 80 and 443, whether using DROP or REJECT, and whether using REJECT with -reject-with tcp-reset or -r reject-with imp-port-unreachable, the job soon stops publishing to Kafka. This happens whether or not the Kafka sources have {{setCommitOffsetsOnCheckpoints}} set to true or false. The system is configured to use Presto for the S3 file system. The job has a small amount of state, so it is configured to use {{FsStateBackend}} with asynchronous snapshots. If the ip tables rules are removed, the job continues the function. I would expect the job to either fail or continue running if a checkpoint fails. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9440) Allow cancelation and reset of timers
Elias Levy created FLINK-9440: - Summary: Allow cancelation and reset of timers Key: FLINK-9440 URL: https://issues.apache.org/jira/browse/FLINK-9440 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.4.2 Reporter: Elias Levy Currently the {{TimerService}} allows one to register timers, but it is not possible to delete a timer or to reset a timer to a new value. If one wishes to reset a timer, one must also handle the previous inserted timer callbacks and ignore them. I would be useful if the API allowed one to remove and reset timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9403) Documentation continues to refer to removed methods
Elias Levy created FLINK-9403: - Summary: Documentation continues to refer to removed methods Key: FLINK-9403 URL: https://issues.apache.org/jira/browse/FLINK-9403 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.6.0 Reporter: Elias Levy {{org.apache.flink.api.common.ExecutionConfig}} no longer has the {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} methods. They were removed in [this commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9272) DataDog API "counter" metric type is deprecated
Elias Levy created FLINK-9272: - Summary: DataDog API "counter" metric type is deprecated Key: FLINK-9272 URL: https://issues.apache.org/jira/browse/FLINK-9272 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.4.2 Reporter: Elias Levy It appears to have been replaced by the "count" metric type. https://docs.datadoghq.com/developers/metrics/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster
Elias Levy created FLINK-8886: - Summary: Job isolation via scheduling in shared cluster Key: FLINK-8886 URL: https://issues.apache.org/jira/browse/FLINK-8886 Project: Flink Issue Type: Improvement Components: Scheduler Affects Versions: 1.5.0 Reporter: Elias Levy Flink's TaskManager executes tasks from different jobs within the same JMV as threads. We prefer to isolate different jobs on their on JVM. Thus, we must use different TMs for different jobs. As currently the scheduler will allocate task slots within a TM to tasks from different jobs, that means we must stand up one cluster per job. This is wasteful, as it requires at least two JobManagers per cluster for high-availability, and the JMs have low utilization. Additionally, different jobs may require different resources. Some jobs are compute heavy. Some are IO heavy (lots of state in RocksDB). At the moment the scheduler threats all TMs are equivalent, except possibly in their number of available task slots. Thus, one is required to stand up multiple cluster if there is a need for different types of TMs. It would be useful if one could specify requirements on job, such that they are only scheduled on a subset of TMs. Properly configured, that would permit isolation of jobs in a shared cluster and scheduling of jobs with specific resource needs. One possible implementation is to specify a set of tags on the TM config file which the TMs used when registering with the JM, and another set of tags configured within the job or supplied when submitting the job. The scheduler could then match the tags in the job with the tags in the TMs. In a restrictive mode the scheduler would assign a job task to a TM only if all tags match. In a relaxed mode the scheduler could assign a job task to a TM if there is a partial match, while giving preference to a more accurate match. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API
Elias Levy created FLINK-8844: - Summary: Export job jar file name or job version property via REST API Key: FLINK-8844 URL: https://issues.apache.org/jira/browse/FLINK-8844 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.3 Reporter: Elias Levy To aid automated deployment of jobs, it would be useful if the REST API exposed either a running job's jar filename or a version property the job could set, similar to how it sets the job name. As it is now there is no standard mechanism to determine what version of a job is running in a cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8752) ClassNotFoundException when using the user code class loader
Elias Levy created FLINK-8752: - Summary: ClassNotFoundException when using the user code class loader Key: FLINK-8752 URL: https://issues.apache.org/jira/browse/FLINK-8752 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.1 Reporter: Elias Levy Attempting to submit a job results in the job failing while it is being started in the JMs with a ClassNotFoundException error: {code:java} java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) {code} If I drop the job's jar into the lib folder in the JM and configure the JM to classloader.resolve-order to parent-first the job starts successfully. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8751) Canceling a job results in a InterruptedException in the JM
Elias Levy created FLINK-8751: - Summary: Canceling a job results in a InterruptedException in the JM Key: FLINK-8751 URL: https://issues.apache.org/jira/browse/FLINK-8751 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.1 Reporter: Elias Levy Canceling a job results in the following exception reported by the JM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8358) Hostname used by DataDog metric reporter is not configurable
Elias Levy created FLINK-8358: - Summary: Hostname used by DataDog metric reporter is not configurable Key: FLINK-8358 URL: https://issues.apache.org/jira/browse/FLINK-8358 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.4.0 Reporter: Elias Levy The hostname used by the DataDog metric reporter to report metrics is not configurable. This can problematic if the hostname that Flink uses is different from the hostname used by the system's DataDog agent. For instance, in our environment we use Chef, and using the DataDog Chef Handler, certain metadata such a host roles is associated with the hostname in the DataDog service. The hostname used to submit this metadata is the name we have given the host. But as Flink picks up the default name given by EC2 to the instance, metrics submitted by Flink to DataDog using that hostname are not associated with the tags derived from Chef. In the Job Manager we can avoid this issue by explicitly setting the config {{jobmanager.rpc.address}} to the hostname we desire. I attempted to do the name on the Task Manager by setting the {{taskmanager.hostname}} config, but DataDog does not seem to pick up that value. Digging through the code it seem the DD metric reporter get the hostname from the {{TaskManagerMetricGroup}} host variable, which seems to be set from {{taskManagerLocation.getHostname}}. That in turn seems to be by calling {{this.inetAddress.getCanonicalHostName()}}, which merely perform a reverse lookup on the IP address, and then calling {{NetUtils.getHostnameFromFQDN}} on the result. The later is further problematic because it result is a non-fully qualified hostname. More generally, there seems to be a need to specify the hostname of a JM or TM node that be reused across Flink components. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures
Elias Levy created FLINK-8352: - Summary: Flink UI Reports No Error on Job Submission Failures Key: FLINK-8352 URL: https://issues.apache.org/jira/browse/FLINK-8352 Project: Flink Issue Type: Bug Components: Web Client Affects Versions: 1.4.0 Reporter: Elias Levy If you submit a job jar via the web UI and it raises an exception when started, the UI will report no error and will continue the show the animated image that makes it seem as if it is working. In addition, no error is printed in the logs, unless the level is increased to at least DEBUG: {noformat} @40005a4c399202b87ebc DEBUG org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while handling request. @40005a4c399202b8868c java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b88a74 at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68) @40005a4c399202b88e5c at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.FutureTask.run(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) @40005a4c399202b8f3ec at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) @40005a4c399202b8f7d4 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) @40005a4c399202b8f7d4 at java.lang.Thread.run(Unknown Source) @40005a4c399202b8fbbc Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b90b5c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) @40005a4c399202b90f44 at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334) @40005a4c399202b90f44 at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76) @40005a4c399202b91afc at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57) @40005a4c399202b91afc ... 8 more @40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError @40005a4c399202b91ee4 at com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala) @40005a4c399202b922cc at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) @40005a4c399202b92a9c at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92a9c at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92e84 at java.lang.reflect.Method.invoke(Unknown Source) @40005a4c399202b92e84 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) @40005a4c399202b9326c at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) @40005a4c399202b93a3c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) @40005a4c399202b949dc ... 11 more @40005a4c399202b949dc Caused by: java.io.FileNotFoundException: /data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55 (No such file or directory) @40005a4c399202b951ac at java.io.FileOutputStream.open0(Native Method) @40005a4c399202b951ac at java.io.FileOutputStream.open(Unknown Source) @40005a4c399202b9597c at java.io.FileOutputStream.(Unknown Source) @40005a4c399202b9597c at java.io.FileWriter.(Unknown Source) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b9614c at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901) @40005a4c399202b9614c at scoverage.Invoker$.invoked(Invoker.scala:42) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala:28) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala) {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8311) Flink needs documentation for network access control
Elias Levy created FLINK-8311: - Summary: Flink needs documentation for network access control Key: FLINK-8311 URL: https://issues.apache.org/jira/browse/FLINK-8311 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.4.0 Reporter: Elias Levy There is a need for better documentation on what connects to what over which ports in a Flink cluster to allow users to configure network access control rules. E.g. I was under the impression that in a ZK HA configuration the Job Managers were essentially independent and only coordinated via ZK. But starting multiple JMs in HA with the JM RPC port blocked between JMs shows that the second JM's Akka subsystem is trying to connect to the leading JM: INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [2 ms]. WARN akka.remote.ReliableDeliverySupervisor- Association with remote system [akka.tcp://flink@10.210.210.127:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [2 ms].] WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /10.210.210.127:6123 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables
Elias Levy created FLINK-7935: - Summary: Metrics with user supplied scope variables Key: FLINK-7935 URL: https://issues.apache.org/jira/browse/FLINK-7935 Project: Flink Issue Type: Improvement Affects Versions: 1.3.2 Reporter: Elias Levy We use DataDog for metrics. DD and Flink differ somewhat in how they track metrics. Flink names and scopes metrics together, at least by default. E.g. by default the System scope for operator metrics is {{.taskmanager}}. The scope variables become part of the metric's full name. In DD the metric would be named something generic, e.g. {{taskmanager.job.operator}}, and they would be distinguished by their tag values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. Flink allows you to configure the format string for system scopes, so it is possible to set the operator scope format to {{taskmanager.job.operator}}. We do this for all scopes: {code} metrics.scope.jm: jobmanager metrics.scope.jm.job: jobmanager.job metrics.scope.tm: taskmanager metrics.scope.tm.job: taskmanager.job metrics.scope.task: taskmanager.job.task metrics.scope.operator: taskmanager.job.operator {code} This seems to work. The DataDog Flink metric's plugin submits all scope variables as tags, even if they are not used within the scope format. And it appears internally this does not lead to metrics conflicting with each other. We would like to extend this to user defined metrics, but you can define variables/scopes when adding a metric group or metric with the user API, so that in DD we have a single metric with a tag with many different values, rather than hundreds of metrics to just the one value we want to measure across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7722) MiniCluster does not appear to honor Log4j settings
Elias Levy created FLINK-7722: - Summary: MiniCluster does not appear to honor Log4j settings Key: FLINK-7722 URL: https://issues.apache.org/jira/browse/FLINK-7722 Project: Flink Issue Type: Bug Components: Local Runtime Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor When executing a job from the command line for testing, it will output logs like: {noformat} Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1337544104] with leader session id 59dd0d9c-938e-4e79-a0eb-709c5cf73014. 09/27/2017 13:15:13 Job execution switched to status RUNNING. 09/27/2017 13:15:13 Source: Custom File Source(1/1) switched to SCHEDULED 09/27/2017 13:15:13 Source: Collect {noformat} It will do so even if the log4j.properties file contains: {code} log4j.rootLogger=ERROR, stdout log4j.logger.org.apache.flink=ERROR log4j.logger.akka=ERROR log4j.logger.org.apache.kafka=ERROR log4j.logger.org.apache.hadoop=ERROR log4j.logger.org.apache.zookeeper=ERROR {code} It seems that the MiniCluster does not honor Log4j settings, or at least that is my guess. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7687) Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts
Elias Levy created FLINK-7687: - Summary: Clarify the master and slaves files are not necessary unless using the cluster start/stop scripts Key: FLINK-7687 URL: https://issues.apache.org/jira/browse/FLINK-7687 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor It would be helpful if the documentation was clearer on the fact that the master/slaves config files are not needed when configured in high-availability mode unless you are using the provided scripts to start and shutdown the cluster over SSH. If you are using some other mechanism to manage Flink instances (configuration management tools such as Chef or Ansible, or container management frameworks like Docker Compose or Kubernetes), these files are unnecessary. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7646) Restart failed jobs with configurable parallelism range
Elias Levy created FLINK-7646: - Summary: Restart failed jobs with configurable parallelism range Key: FLINK-7646 URL: https://issues.apache.org/jira/browse/FLINK-7646 Project: Flink Issue Type: Improvement Components: DataStream API Affects Versions: 1.3.2 Reporter: Elias Levy Currently, if a TaskManager fails the whole job is terminated and then, depending on the restart policy, may be attempted to be restarted. If the failed TaskManager has not been replaced, and there are no spare task slots in the cluster, the job will fail to be restarted. There are situations where restoring or adding a new TaskManager may take a while For instance, in AWS an Auto Scaling Group can only be used to manage a group of instances in a single availability zone. If you have a cluster of TaskManagers that spans an AZ, managed by one ASG per AZ, and an AZ goes dark, the other ASGs won't scale automatically to make up for the lost TaskManagers. To resolve the situation the healthy ASGs will need to be modified manually or by systems external to AWS. With that in mind, it would be useful if you could specify a range for the parallelism parameter. Under normal circumstances the job would execute with the maximum parallelism of the range. But if TaskManagers were lost and not replaced after some time, the job would accept being execute with some lower parallelism within the range. I understand that this may not be feasible with checkpoints, as savepoints are supposed to be the mechanism used to change parallelism of a stateful job. Therefore, this proposal may need to wait until the implementation of the periodic savepoint feature (FLINK-4511). This feature would aid the availability of Flink jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail
Elias Levy created FLINK-7641: - Summary: Loss of JobManager in HA mode should not cause jobs to fail Key: FLINK-7641 URL: https://issues.apache.org/jira/browse/FLINK-7641 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 1.3.2 Reporter: Elias Levy Currently if a standalone cluster of JobManagers is configured in high-availability mode and the master JM is lost, the job executing in the cluster will be restarted. This is less than ideal. It would be best if the jobs could continue to execute without restarting while one of the spare JMs becomes the new master, or in the worse case, the jobs are paused while the JM election takes place. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7640) Dashboard should display information about JobManager cluster in HA mode
Elias Levy created FLINK-7640: - Summary: Dashboard should display information about JobManager cluster in HA mode Key: FLINK-7640 URL: https://issues.apache.org/jira/browse/FLINK-7640 Project: Flink Issue Type: Improvement Components: Webfrontend Affects Versions: 1.3.2 Reporter: Elias Levy Currently the dashboard provides no information about the status of a cluster of JobManagers configured in high-availability mode. The dashboard should display the status and membership of a JM cluster in the Overview and Job Manager sections. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7634) Add option to create a savepoint while canceling a job in the dashboard
Elias Levy created FLINK-7634: - Summary: Add option to create a savepoint while canceling a job in the dashboard Key: FLINK-7634 URL: https://issues.apache.org/jira/browse/FLINK-7634 Project: Flink Issue Type: Improvement Components: JobManager Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor Currently there appears to be no way to trigger a savepoint in the dashboard, to cancel a job while taking a savepoint, to list savepoints, or to list external checkpoints. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7547) o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable
Elias Levy created FLINK-7547: - Summary: o.a.f.s.api.scala.async.AsyncFunction is not declared Serializable Key: FLINK-7547 URL: https://issues.apache.org/jira/browse/FLINK-7547 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.2 Reporter: Elias Levy Priority: Minor {{org.apache.flink.streaming.api.scala.async.AsyncFunction}} is not declared {{Serializable}}, whereas {{org.apache.flink.streaming.api.functions.async.AsyncFunction}} is. This leads to the job not starting as the as async function can't be serialized during initialization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7364) Log exceptions from user code in streaming jobs
Elias Levy created FLINK-7364: - Summary: Log exceptions from user code in streaming jobs Key: FLINK-7364 URL: https://issues.apache.org/jira/browse/FLINK-7364 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.1 Reporter: Elias Levy Currently, if an exception arises in user supplied code within an operator in a streaming job, Flink terminates the job, but it fails to record the reason for the termination. The logs do not record that there was an exception at all, much less recording the type of exception and where it occurred. This makes it difficult to debug jobs without implementing exception recording code on all user supplied operators. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7286) Flink Dashboard fails to display bytes/records received by sources
Elias Levy created FLINK-7286: - Summary: Flink Dashboard fails to display bytes/records received by sources Key: FLINK-7286 URL: https://issues.apache.org/jira/browse/FLINK-7286 Project: Flink Issue Type: Bug Components: Webfrontend Affects Versions: 1.3.1 Reporter: Elias Levy It appears Flink can't measure the number of bytes read or records produced by a source (e.g. Kafka source). This is particularly problematic for simple jobs where the job pipeline is chained, and in which there are no measurements between operators. Thus, in the UI it appears that the job is not consuming any data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7178) Datadog Metric Reporter Jar is Lacking Dependencies
Elias Levy created FLINK-7178: - Summary: Datadog Metric Reporter Jar is Lacking Dependencies Key: FLINK-7178 URL: https://issues.apache.org/jira/browse/FLINK-7178 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.3.1 Reporter: Elias Levy The Datadog metric reporter has dependencies on {{com.squareup.okhttp3}} and {{com.squareup.okio}}. It appears there was an attempt to Maven Shade plug-in to move these classes to {{org.apache.flink.shaded.okhttp3}} and {{org.apache.flink.shaded.okio}} during packaging. Alas, the shaded classes are not included in the {{flink-metrics-datadog-1.3.1.jar}} released to Maven Central. Using the Jar results in an error when the Jobmanager or Taskmanager starts up because of the missing dependencies. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-6472) BoundedOutOfOrdernessTimestampExtractor does not bound out of orderliness
Elias Levy created FLINK-6472: - Summary: BoundedOutOfOrdernessTimestampExtractor does not bound out of orderliness Key: FLINK-6472 URL: https://issues.apache.org/jira/browse/FLINK-6472 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: Elias Levy {{BoundedOutOfOrdernessTimestampExtractor}} attempts to emit watermarks that lag behind the largest observed timestamp by a configurable time delta. It fails to so in some circumstances. The class extends {{AssignerWithPeriodicWatermarks}}, which generates watermarks in periodic intervals. The timer for this intervals is a processing time timer. In circumstances where there is a rush of events (restarting Flink, unpausing an upstream producer, loading events from a file, etc), many events with timestamps much larger that what the configured bound would normally allow will be sent downstream without a watermark. This can have negative effects downstream, as operators may be buffering the events waiting for a watermark to process them, thus leading the memory growth and possible out-of-memory conditions. It is probably best to have a bounded out of orderliness extractor that is based on the punctuated timestamp extractor, so we can ensure that watermarks are generated in a timely fashion in event time, with the addition of process time timer to generate a watermark if there has been a lull in events, thus also bounding the delay of generating a watermark in processing time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6420) Cleaner CEP API to specify conditions between events
Elias Levy created FLINK-6420: - Summary: Cleaner CEP API to specify conditions between events Key: FLINK-6420 URL: https://issues.apache.org/jira/browse/FLINK-6420 Project: Flink Issue Type: Improvement Components: CEP Affects Versions: 1.3.0 Reporter: Elias Levy Priority: Minor Flink 1.3 will introduce so-called iterative conditions, which allow the predicate to look up events already matched by conditions in the pattern. This permits specifying conditions between matched events, similar to a conditional join between tables in SQL. Alas, the API could be simplified to specify such conditions more declaratively. At the moment you have to do something like {code} Pattern. .begin[Foo]("first") .where( first => first.baz == 1 ) .followedBy("next") .where({ (next, ctx) => val first = ctx.getEventsForPattern("first").next first.bar == next.bar && next => next.boo = "x" }) {code} which is not very clean. It would friendlier if you could do something like: {code} Pattern. .begin[Foo]("first") .where( first => first.baz == 1 ) .followedBy("next") .relatedTo("first", { (first, next) => first.bar == next.bar }) .where( next => next.boo = "x" ) {code} Something along these lines would work well when the condition being tested against matches a single event (single quantifier). If the condition being tested can accept multiple events (e.g. times quantifier) two other methods could be used {{relatedToAny}} and {{relatedToAll}}, each of which takes a predicate function. In both cases each previously accepted element of the requested condition is evaluated against the predicate. In the former case if any evaluation returns true the condition is satisfied. In the later case all evaluations must return true for the condition to be satisfied. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6419) Better support for CEP quantified conditions in PatternSelect.select
Elias Levy created FLINK-6419: - Summary: Better support for CEP quantified conditions in PatternSelect.select Key: FLINK-6419 URL: https://issues.apache.org/jira/browse/FLINK-6419 Project: Flink Issue Type: Improvement Components: CEP Affects Versions: 1.3.0 Reporter: Elias Levy Priority: Minor Flink 1.3 introduces to the API quantifer methods which allow one to declaratively specific how many times a condition must be matched before there is a state change. The pre-existing {{PatternSelect.select}} method does not account for this change very well. The selection function passed to {{select}} receives a {{Map[String,T]}} as an argument that permits the function to look up the matched events by the condition's name. To support the new functionality that permits a condition to match multiple elements, when a quantifier is greater than one, the matched events are stored in the map by appending the condition's name with an underscore and an index value. While functional, this is less than ideal. It would be best if conditions with quantifier that is a multiple returned the matched events in an array and if they were accessible via the condition's name, without have to construct keys from the condition's name and an index, and iterate querying the map until no more are found. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6243) Continuous Joins: True Sliding Window Joins
Elias Levy created FLINK-6243: - Summary: Continuous Joins: True Sliding Window Joins Key: FLINK-6243 URL: https://issues.apache.org/jira/browse/FLINK-6243 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 1.1.4 Reporter: Elias Levy Flink defines sliding window joins as the join of elements of two streams that share a window of time, where the windows are defined by advancing them forward some amount of time that is less than the window time span. More generally, such windows are just overlapping hopping windows. Other systems, such as Kafka Streams, support a different notion of sliding window joins. In these systems, two elements of a stream are joined if the absolute time difference between the them is less or equal the time window length. This alternate notion of sliding window joins has some advantages in some applications over the current implementation. Elements to be joined may both fall within multiple overlapping sliding windows, leading them to be joined multiple times, when we only wish them to be joined once. The implementation need not instantiate window objects to keep track of stream elements, which becomes problematic in the current implementation if the window size is very large and the slide is very small. It allows for asymmetric time joins. E.g. join if elements from stream A are no more than X time behind and Y time head of an element from stream B. It is currently possible to implement a join with these semantics using {{CoProcessFunction}}, but the capability should be a first class feature, such as it is in Kafka Streams. To perform the join, elements of each stream must be buffered for at least the window time length. To allow for large window sizes and high volume of elements, the state, possibly optionally, should be buffered such as it can spill to disk (e.g. by using RocksDB). The same stream may be joined multiple times in a complex topology. As an optimization, it may be wise to reuse any element buffer among colocated join operators. Otherwise, there may write amplification and increased state that must be snapshotted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6239) Sharing of State Across Operators
Elias Levy created FLINK-6239: - Summary: Sharing of State Across Operators Key: FLINK-6239 URL: https://issues.apache.org/jira/browse/FLINK-6239 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.1.4 Reporter: Elias Levy Currently state cannot be shared across operators. On a keyed stream, the state is implicitly keyed by the operator id, in addition to the stream key. This can make it more difficult and inefficient to implement complex topologies, where multiple operator may need to access the same state. It would be value to be able to access keyed value and map stated across operators. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-4558) Add support for synchronizing streams
Elias Levy created FLINK-4558: - Summary: Add support for synchronizing streams Key: FLINK-4558 URL: https://issues.apache.org/jira/browse/FLINK-4558 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.1.0 Reporter: Elias Levy As mentioned on the [mailing list|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/synchronizing-two-streams-td6830.html], there are use cases that require synchronizing two streams on via their times and where it is not practical to buffer all messages from one streams while waiting for the other to synchronize. Flink should add functionality to enable such use cases. This could be implemented by modifying TwoInputStreamOperator so that calls to processElement1 and processElement2 could return a value indicating that the element can't yet be processed, having the framework then pause processing for some time, potentially using exponential back off with a hard maximum, and then allowing the back pressure system to do its work and pause the stream. Alternatively, an API could be added to explicitly pause/unpause a stream. For ease of use either of these mechanism should be used to create a SynchronizedTwoInputStreamOperator that end users can utilize by passing a configurable time delta to use as a synchronization threshold. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4502) Cassandra connector documentation has misleading consistency guarantees
Elias Levy created FLINK-4502: - Summary: Cassandra connector documentation has misleading consistency guarantees Key: FLINK-4502 URL: https://issues.apache.org/jira/browse/FLINK-4502 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.0 Reporter: Elias Levy The Cassandra connector documentation states that "enableWriteAheadLog() is an optional method, that allows exactly-once processing for non-deterministic algorithms." This claim appears to be false. >From what I gather, the write ahead log feature of the connector works as >follows: - The sink is replaced with a stateful operator that writes incoming messages to the state backend based on checkpoint they belong in. - When the operator is notified that a Flink checkpoint has been completed it, for each set of checkpoints older than and including the committed one: * reads its messages from the state backend * writes them to Cassandra * records that it has committed them to Cassandra for the specific checkpoint and operator instance * and erases them from the state backend. This process attempts to avoid resubmitting queries to Cassandra that would otherwise occur when recovering a job from a checkpoint and having messages replayed. Alas, this does not guarantee exactly once semantics at the sink. The writes to Cassandra that occur when the operator is notified that checkpoint is completed are not atomic and they are potentially non-idempotent. If the job dies while writing to Cassandra or before committing the checkpoint via committer, queries will be replayed when the job recovers. Thus the documentation appear to be incorrect in stating this provides exactly-once semantics. There also seems to be an issue in GenericWriteAheadSink's notifyOfCompletedCheckpoint which may result in incorrect output. If sendValues returns false because a write failed, instead of bailing, it simply moves on to the next checkpoint to commit if there is one, keeping the previous one around to try again later. But that can result in newer data being overwritten with older data when the previous checkpoint is retried. Although given that CassandraCommitter implements isCheckpointCommitted as checkpointID <= this.lastCommittedCheckpointID, it actually means that when it goes back to try the uncommitted older checkpoint it will consider it committed, even though some of its data may not have been written out, and the data will be discarded. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4501) Cassandra sink can lose messages
Elias Levy created FLINK-4501: - Summary: Cassandra sink can lose messages Key: FLINK-4501 URL: https://issues.apache.org/jira/browse/FLINK-4501 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The problem is the same as I pointed out with the Kafka producer sink (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() both send data asynchronously to Cassandra and record whether an error occurs via a future callback. But CassandraSinkBase does not implement Checkpointed, so it can't stop checkpoint from happening even though the are Cassandra queries in flight from the checkpoint that may fail. If they do fail, they would subsequently not be replayed when the job recovered, and would thus be lost. In addition, CassandraSinkBase's close should check whether there is a pending exception and throw it, rather than silently close. It should also wait for any pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4500) Cassandra sink can lose messages
Elias Levy created FLINK-4500: - Summary: Cassandra sink can lose messages Key: FLINK-4500 URL: https://issues.apache.org/jira/browse/FLINK-4500 Project: Flink Issue Type: Bug Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The problem is the same as I pointed out with the Kafka producer sink (FLINK-4027). The CassandraTupleSink's send() and CassandraPojoSink's send() both send data asynchronously to Cassandra and record whether an error occurs via a future callback. But CassandraSinkBase does not implement Checkpointed, so it can't stop checkpoint from happening even though the are Cassandra queries in flight from the checkpoint that may fail. If they do fail, they would subsequently not be replayed when the job recovered, and would thus be lost. In addition, CassandraSinkBase's close should check whether there is a pending exception and throw it, rather than silently close. It should also wait for any pending async queries to complete and check their status before closing. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4498) Better Cassandra sink documentation
Elias Levy created FLINK-4498: - Summary: Better Cassandra sink documentation Key: FLINK-4498 URL: https://issues.apache.org/jira/browse/FLINK-4498 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.1.0 Reporter: Elias Levy The Cassandra sink documentation is somewhat muddled and could be improved. For instance, the fact that is only supports tuples and POJO's that use DataStax Mapper annotations is only mentioned in passing, and it is not clear that the reference to tuples only applies to Flink Java tuples and not Scala tuples. The documentation also does not mention that setQuery() is only necessary for tuple streams. It would be good to have an example of a POJO stream with the DataStax annotations. The explanation of the write ahead log could use some cleaning up to clarify when it is appropriate to use, ideally with an example. Maybe this would be best as a blog post to expand on the type of non-deterministic streams this applies to. It would also be useful to mention that tuple elements will be mapped to Cassandra columns using the Datastax Java driver's default encoders, which are somewhat limited (e.g. to write to a blob column the type in the tuple must be a java.nio.ByteBuffer and not just a byte[]). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4497) Add support for Scala tuples and case classes to Cassandra sink
Elias Levy created FLINK-4497: - Summary: Add support for Scala tuples and case classes to Cassandra sink Key: FLINK-4497 URL: https://issues.apache.org/jira/browse/FLINK-4497 Project: Flink Issue Type: Improvement Components: Cassandra Connector Affects Versions: 1.1.0 Reporter: Elias Levy The new Cassandra sink only supports streams of Flink Java tuples and Java POJOs that have been annotated for use by Datastax Mapper. The sink should be extended to support Scala types and case classes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4326) Flink start-up scripts should optionally start services on the foreground
Elias Levy created FLINK-4326: - Summary: Flink start-up scripts should optionally start services on the foreground Key: FLINK-4326 URL: https://issues.apache.org/jira/browse/FLINK-4326 Project: Flink Issue Type: Improvement Components: Startup Shell Scripts Affects Versions: 1.0.3 Reporter: Elias Levy This has previously been mentioned in the mailing list, but has not been addressed. Flink start-up scripts start the job and task managers in the background. This makes it difficult to integrate Flink with most processes supervisory tools and init systems, including Docker. One can get around this via hacking the scripts or manually starting the right classes via Java, but it is a brittle solution. In addition to starting the daemons in the foreground, the start up scripts should use exec instead of running the commends, so as to avoid forks. Many supervisory tools assume the PID of the process to be monitored is that of the process it first executes, and fork chains make it difficult for the supervisor to figure out what process to monitor. Specifically, jobmanager.sh and taskmanager.sh should exec flink-daemon.sh, and flink-daemon.sh should exec java. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4050) FlinkKafkaProducer API Refactor
Elias Levy created FLINK-4050: - Summary: FlinkKafkaProducer API Refactor Key: FLINK-4050 URL: https://issues.apache.org/jira/browse/FLINK-4050 Project: Flink Issue Type: Improvement Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Elias Levy The FlinkKafkaProducer API seems more difficult to use than it should be. The API requires you pass it a SerializationSchema or a KeyedSerializationSchema, but the Kafka producer already has a serialization API. Requiring a serializer in the Flink API precludes the use of the Kafka serializers. For instance, they preclude the use of the Confluent KafkaAvroSerializer class that makes use of the Confluent Schema Registry. Ideally, the serializer would be optional, so as to allow the Kafka producer serializers to handle the task. In addition, the KeyedSerializationSchema conflates message key extraction with key serialization. If the serializer were optional, to allow the Kafka producer serializers to take over, you'd still need to extract a key from the message. And given that the key may not be part of the message you want to write to Kafka, an upstream step may have to package the key with the message to make both available to the sink, for instance in a tuple. That means you also need to define a method to extract the message to write to Kafka from the element passed into the sink by Flink. In summary, there should be separation of extraction of the key and message from the element passed into the sink from serialization, and the serialization step should be optional. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4027) FlinkKafkaProducer09 sink can lose messages
Elias Levy created FLINK-4027: - Summary: FlinkKafkaProducer09 sink can lose messages Key: FLINK-4027 URL: https://issues.apache.org/jira/browse/FLINK-4027 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.0.3 Reporter: Elias Levy Priority: Critical The FlinkKafkaProducer09 sink appears to not offer at-least-once guarantees. The producer is publishing messages asynchronously. A callback can record publishing errors, which will be raised when detected. But as far as I can tell, there is no barrier to wait for async errors from the sink when checkpointing or to track the event time of acked messages to inform the checkpointing process. If a checkpoint occurs while there are pending publish requests, and the requests return a failure after the checkpoint occurred, those message will be lost as the checkpoint will consider them processed by the sink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3984) Event time of stream transformations is undocumented
Elias Levy created FLINK-3984: - Summary: Event time of stream transformations is undocumented Key: FLINK-3984 URL: https://issues.apache.org/jira/browse/FLINK-3984 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.0.3 Reporter: Elias Levy The Event Time, Windowing, and DataStream Transformation documentation section fail to state what event time, if any, the output of transformations have on a stream that is configured to use event time and that has timestamp assigners. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3710) ScalaDocs for org.apache.flink.streaming.scala are missing from the web site
Elias Levy created FLINK-3710: - Summary: ScalaDocs for org.apache.flink.streaming.scala are missing from the web site Key: FLINK-3710 URL: https://issues.apache.org/jira/browse/FLINK-3710 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.0.1 Reporter: Elias Levy The ScalaDocs only include docs for org.apache.flink.scala and sub-packages. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3692) Develop a Kafka state backend
Elias Levy created FLINK-3692: - Summary: Develop a Kafka state backend Key: FLINK-3692 URL: https://issues.apache.org/jira/browse/FLINK-3692 Project: Flink Issue Type: New Feature Components: Core Reporter: Elias Levy Flink clusters usually consume of a Kafka cluster. It simplify operations if Flink could store its state checkpoints in Kafka. This should be possibly using different topics to write to, partitioning appropriately, and using compacted topics. This would avoid the need to run an HDFS cluster just to store Flink checkpoints. For inspiration you may want to take a look at how Samza checkpoints a task's local state to a Kafka topic, and how the newer Kafka consumers checkpoint their offsets to Kafka. -- This message was sent by Atlassian JIRA (v6.3.4#6332)