[jira] [Commented] (FLINK-14111) Flink should be robust to a non-leader Zookeeper host going down
[ https://issues.apache.org/jira/browse/FLINK-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931901#comment-16931901 ] Elias Levy commented on FLINK-14111: Probable related to FLINK-10052. > Flink should be robust to a non-leader Zookeeper host going down > > > Key: FLINK-14111 > URL: https://issues.apache.org/jira/browse/FLINK-14111 > Project: Flink > Issue Type: Wish > Components: Runtime / Coordination >Affects Versions: 1.7.2, 1.8.0, 1.8.1, 1.9.0 > Environment: Linux > JVM 8 > Flink {{1.7.2}}, {{1.8.1}}, {{1.9.0}} > {{Zookeeper version 3.4.5}} >Reporter: Aaron Levin >Priority: Major > > I noticed that if a non-leader Zookeeper node goes down and there is still > quorom in the zookeeper cluster , my flink application will restart anyway. I > believe it should be possible for Flink applications not to require a restart > in this scenario. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16931611#comment-16931611 ] Elias Levy commented on FLINK-12122: Till, that is a welcomed development. I am surprised this issue has languished since the 1.5 days. It makes it very difficult to run certain jobs in standalone clusters that are over-allocated to handle failover in case of TM node failure. The uneven allocation of tasks results in Kafka consumer lag for a subset of partitions under many workloads. We've that to modify our clusters to exactly match parallelism and number of slots, and use other mechanisms to handle failover when upgrading old jobs to 1.9. > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Priority: Major > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930672#comment-16930672 ] Elias Levy commented on FLINK-12122: What is the status of this issue? It's has a serious negative effect on production clusters now that we upgraded some old jobs to 1.9.0. > Spread out tasks evenly across all available registered TaskManagers > > > Key: FLINK-12122 > URL: https://issues.apache.org/jira/browse/FLINK-12122 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Affects Versions: 1.6.4, 1.7.2, 1.8.0 >Reporter: Till Rohrmann >Priority: Major > Attachments: image-2019-05-21-12-28-29-538.png, > image-2019-05-21-13-02-50-251.png > > > With Flip-6, we changed the default behaviour how slots are assigned to > {{TaskManages}}. Instead of evenly spreading it out over all registered > {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a > tendency to first fill up a TM before using another one. This is a regression > wrt the pre Flip-6 code. > I suggest to change the behaviour so that we try to evenly distribute slots > across all available {{TaskManagers}} by considering how many of their slots > are already allocated. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic
[ https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930650#comment-16930650 ] Elias Levy commented on FLINK-14002: I am for that. Alternatively, {{KafkaSerializationSchema}} and {{KeyedSerializationSchema}} could be merged, such that {{serialize}} could have a default implementation that calls {{getTargetTopic}}, {{serializeKey}}, and {{serializeValue}}, but can be overridden for more advanced applications. > FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt > take default topic > -- > > Key: FLINK-14002 > URL: https://issues.apache.org/jira/browse/FLINK-14002 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Gyula Fora >Priority: Major > > When the KafkaSerializationSchema is used the user has the to provide the > topic always when they create the ProducerRecord. > The defaultTopic specified in the constructor (and enforced not to be null) > will always be ignored, this is very misleading. > We should depracate these constructors and create new ones without > defaultTopic. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-5312) Implement Standalone Setup v2.0
[ https://issues.apache.org/jira/browse/FLINK-5312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16929598#comment-16929598 ] Elias Levy commented on FLINK-5312: --- Curious about the reasoning behind the Won't Do resolution. Is the expectation that anyone that requires such functionality fallback on Kubernetes or some other resource manager? > Implement Standalone Setup v2.0 > --- > > Key: FLINK-5312 > URL: https://issues.apache.org/jira/browse/FLINK-5312 > Project: Flink > Issue Type: New Feature > Components: Runtime / Coordination >Reporter: Manu Zhang >Priority: Major > > Copied from > [FLIP-6|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077] > A future version of the Standalone Setup could be thought of to implement > something like a “lightweight Yarn” architecture: > * All nodes run a simple “NodeManager” process that spawns processes for the > TaskManagers and JobManagers, that way offering proper isolation of jobs > against each other. > * The LocalDispatcher will not spawn the JobManager internally but on a > lightweight “node manager” -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic
[ https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16929556#comment-16929556 ] Elias Levy commented on FLINK-14002: Came here to open just tis ticket. {{KeyedSerializationSchema}} was deprecated but {{KafkaSerializationSchema}} was not very well thought out. A {{KafkaSerializationSchema}} that implements {{KafkaContextAware}} can return null from {{getTargetTopic}}, and that will result in the sink using the default topic to look up the partition information, but the {{KeyedSerializationSchema}} still needs to fill in the topic in the {{ProducerRecord}} or it will result in an exception. Having a serializer that is lower-level and can create a `ProducerRecord` is nice, but IMHO we should have not have deprecated the simpler higher-level serializer. > FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt > take default topic > -- > > Key: FLINK-14002 > URL: https://issues.apache.org/jira/browse/FLINK-14002 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka >Reporter: Gyula Fora >Priority: Major > > When the KafkaSerializationSchema is used the user has the to provide the > topic always when they create the ProducerRecord. > The defaultTopic specified in the constructor (and enforced not to be null) > will always be ignored, this is very misleading. > We should depracate these constructors and create new ones without > defaultTopic. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-14057) Add Remove Other Timers to TimerService
[ https://issues.apache.org/jira/browse/FLINK-14057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928676#comment-16928676 ] Elias Levy commented on FLINK-14057: Could also add {{replaceProcessingTimeTimer}} and {{replaceEventTimeTimer}} . > Add Remove Other Timers to TimerService > --- > > Key: FLINK-14057 > URL: https://issues.apache.org/jira/browse/FLINK-14057 > Project: Flink > Issue Type: Improvement >Reporter: Jesse Anderson >Priority: Major > > The TimerService service has the ability to add timers with > registerProcessingTimeTimer. This method can be called many times and have > different timer times. > If you want to add a new timer and delete other timers, you have to keep > track of all previous timer times and call deleteProcessingTimeTimer for each > time. This method forces you to keep track of all previous (unexpired) timers > for a key. > Instead, I suggest overloading registerProcessingTimeTimer with a second > boolean argument that will remove all previous timers and set the new timer. > Note: although I'm using registerProcessingTimeTimer, this applies to > registerEventTimeTimer as well. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890276#comment-16890276 ] Elias Levy commented on FLINK-10052: The whole point of ZK is that it doesn't suffer from split brain. It is a strongly consistent system. > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882252#comment-16882252 ] Elias Levy commented on FLINK-10052: Dominik, any progress? > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-13189) Fix the impact of zookeeper network disconnect temporarily on flink long running jobs
[ https://issues.apache.org/jira/browse/FLINK-13189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882251#comment-16882251 ] Elias Levy commented on FLINK-13189: This is a duplicate of FLINK-10052. > Fix the impact of zookeeper network disconnect temporarily on flink long > running jobs > - > > Key: FLINK-13189 > URL: https://issues.apache.org/jira/browse/FLINK-13189 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.8.1 >Reporter: lamber-ken >Assignee: lamber-ken >Priority: Major > Labels: pull-request-available > Fix For: 1.9.0 > > Time Spent: 10m > Remaining Estimate: 0h > > *Issue detail info* > We deploy flink streaming jobs on hadoop cluster on per-job model and use > zookeeper as HighAvailabilityService, but we found that flink job will > restart because of the network was disconnected temporarily between > jobmanager and zookeeper. > So we analyze this problem deeply. Flink JobManager use curator's > `+LeaderLatch+` to maintain the leadership. When network disconncet, the > `+LeaderLatch+` will change leadership to false directly. We think it's too > brutally that many flink longrunning jobs will restart because of the network > shake. > > *Fix this issue* > From curator official website, we found that this issuse was fixed at > curator-3.x.x, but we can't not just change the flink-curator-version(2.12.0) > to 3.x.x because of zk-compatibility. Curator-2.x.x support zookeeper-3.4.x > and zookeeper-3.5.0, curator-3.x.x just compatible with ZooKeeper 3.5.x. > Based on the above considerations, we update `LeaderLatch` at > flink-shaded-curator module. > > *Other* > Any suggestions are webcome, thanks > > *Useful links* > [https://curator.apache.org/zk-compatibility.html] > [https://cwiki.apache.org/confluence/display/CURATOR/Releases] > [http://curator.apache.org/curator-recipes/leader-latch.html] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-12104) Flink Kafka fails with Incompatible KafkaProducer version / NoSuchFieldException sequenceNumbers
[ https://issues.apache.org/jira/browse/FLINK-12104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808986#comment-16808986 ] Elias Levy commented on FLINK-12104: The {{flink-connector-kafka-0.11}} depends on the Kafka 0.11.0.2 client, which does have that field. The field was only removed in 1.0.0. Are you overriding the Kafka client dependency? If so, that is your problem. If you want to use a newer Kafka client, use the universal Kafka connection ({{flink-connector-kafka_2.11}}), which tracks the latest version of the Kafka client. > Flink Kafka fails with Incompatible KafkaProducer version / > NoSuchFieldException sequenceNumbers > > > Key: FLINK-12104 > URL: https://issues.apache.org/jira/browse/FLINK-12104 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka >Affects Versions: 1.7.2 >Reporter: Tim >Priority: Major > > FlinkKafkaProducer (in flink-connector-kafka-0.11) tries to access a field > named `sequenceNumbers` from the KafkaProducer's TransactionManager. You can > find this line on the [master branch > here|[https://github.com/apache/flink/blob/d6be68670e661091d94a3c65a2704d52fc0e827c/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java#L197].] > > {code:java} > Object transactionManager = getValue(kafkaProducer, "transactionManager"); > ... > Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers"); > {code} > > However, the Kafka TransactionManager no longer has a "sequenceNumbers" > field. This was changed back on 9/14/2017 (KAFKA-5494) in an effort to > support multiple inflight requests while still guaranteeing idempotence. See > [commit diff > here|[https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630].] > Subsequently when Flink tries to "recoverAndCommit" (see > FlinkKafkaProducer011) it fails with a "NoSuchFieldException: > sequenceNumbers", followed by a "Incompatible KafkaProducer version". > Given that the KafkaProducer used is so old (this change was made almost two > years ago) are there any plans of upgrading? Or - are there some known > compatibility issues that prevent Flink/Kafka connector from doing so? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-12024) Bump universal Kafka connector to Kafka dependency to 2.2.0
Elias Levy created FLINK-12024: -- Summary: Bump universal Kafka connector to Kafka dependency to 2.2.0 Key: FLINK-12024 URL: https://issues.apache.org/jira/browse/FLINK-12024 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.7.2 Reporter: Elias Levy Update the Kafka client dependency to version 2.2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11794) Allow compression of row format files created by StreamingFileSink
Elias Levy created FLINK-11794: -- Summary: Allow compression of row format files created by StreamingFileSink Key: FLINK-11794 URL: https://issues.apache.org/jira/browse/FLINK-11794 Project: Flink Issue Type: Improvement Components: Connectors / FileSystem Affects Versions: 1.7.2 Reporter: Elias Levy Currently, there is no mechanism to compress files created using a StreamingFileSink. This is highly desirable when output is a text based row format such as JSON. Possible alternatives are the introduction of a callback that gets passed the local file before it is uploaded to the DFS, so that it could be compressed; or a factory method could be used that returns an OutputStream, such as GZIPOutputStream, that compresses a passed in output stream that could be then used by the Encoder. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11520) Triggers should be provided the window state
Elias Levy created FLINK-11520: -- Summary: Triggers should be provided the window state Key: FLINK-11520 URL: https://issues.apache.org/jira/browse/FLINK-11520 Project: Flink Issue Type: Improvement Reporter: Elias Levy Some triggers may require access to the window state to perform their job. Consider a window computing a count using an aggregate function. It may be desired to fire the window when the count is 1 and then at the end of the window. The early firing can provide feedback to external systems that a key has been observed, while waiting for the final count. The same problem can be observed in org.apache.flink.streaming.api.windowing.triggers.CountTrigger, which must maintain an internal count instead of being able to make use of the window state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11517) Inefficient window state access when using RocksDB state backend
Elias Levy created FLINK-11517: -- Summary: Inefficient window state access when using RocksDB state backend Key: FLINK-11517 URL: https://issues.apache.org/jira/browse/FLINK-11517 Project: Flink Issue Type: Bug Reporter: Elias Levy When using an aggregate function on a window with a process function and the RocksDB state backend, state access is inefficient. The WindowOperator calls windowState.add to merge the new element using the aggregate function. The add method of RocksDBAggregatingState will read the state, deserialize the state, call the aggregate function, deserialize the state, and write it out. If the trigger decides the window must be fired, as the the windowState.add does not return the state, the WindowOperator must call windowState.get to get it and pass it to the window process function, resulting in another read and deserialization. Finally, while the state is not passed in to the trigger, in some cases the trigger may have a need to access the state. That is our case. As the state is not passed to the trigger, we must read and deserialize the state one more from within the trigger. Thus, state must be read and deserialized three times to process a single element. If the state is large, this can be quite costly. Ideally windowState.add would return the state, so that the WindowOperator can pass it to the process function without having to read it again. Additionally, the state would be made available to the trigger to enable more use cases without having to go through the state descriptor again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11435) Different jobs same consumer group are treated as separate groups
[ https://issues.apache.org/jira/browse/FLINK-11435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754156#comment-16754156 ] Elias Levy commented on FLINK-11435: This is expected behavior. Flink manages Kafka offsets itself. Offsets are only reported to Kafka via the consumer group id to allow tracking progress by tools in Kafka ecosystem. > Different jobs same consumer group are treated as separate groups > - > > Key: FLINK-11435 > URL: https://issues.apache.org/jira/browse/FLINK-11435 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.7.1 > Environment: kafka consumer >Reporter: Avi Levi >Priority: Major > > deploying two jobs with the same consumer group id, still treated as > different consumer groups. This behavior does not comply with kafka > expectations. Same consumer group ids should be treated as the same group > this will enable deploying more jobs (specially if they are stateless ) on > demand and it also how normal consumer groups behave. > reproduce : > deploy the same job twice - both jobs consumes the same message although they > share the same consumer id -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10460) DataDog reporter JsonMappingException
[ https://issues.apache.org/jira/browse/FLINK-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16753231#comment-16753231 ] Elias Levy commented on FLINK-10460: [~lining] as you can tell from the backtrace, that is not a user metric. Rather it appear to be a Kafka metric gathered in KafkaMetricWrapper > DataDog reporter JsonMappingException > - > > Key: FLINK-10460 > URL: https://issues.apache.org/jira/browse/FLINK-10460 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Minor > Attachments: image-2019-01-24-16-00-56-280.png > > > Observed the following error in the TM logs this morning: > {code:java} > WARN org.apache.flink.metrics.datadog.DatadogHttpReporter - Failed > reporting metrics to Datadog. > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: > (was java.util.ConcurrentModificationException) (through reference chain: > org.apache.flink.metrics.datadog.DSeries["series"]-> > java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"]) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678) > at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631) >at > org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998) >at > org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90) >at > org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79) >at > org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417) >at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >at java.util.concurrent.FutureTask.runAndReset(Unknown Source) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown > Source) >at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) >at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >at java.lang.Thread.run(Unknown Source) > Caused by: java.util.ConcurrentModificationException >at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source) >at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source) >at java.util.AbstractCollection.addAll(Unknown Source) >at java.util.HashSet.(Unknown Source) >at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65) >at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298) >at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetri
[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
[ https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16749907#comment-16749907 ] Elias Levy commented on FLINK-11249: I believe that the open transactions are maintained. The transaction data is recorded in the transaction log, which is an internal Kafka topic replicated three ways. When a broker is restarted another broker's transaction coordinator becomes the leader for the transaction log partitions that were managed by the restarted broker. The new transaction coordinator leader will read the transaction log partitions and rebuild the in memory transaction state and service the publishers. > FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer > --- > > Key: FLINK-11249 > URL: https://issues.apache.org/jira/browse/FLINK-11249 > Project: Flink > Issue Type: Sub-task > Components: Kafka Connector >Affects Versions: 1.7.0, 1.7.1 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > As reported by a user on the mailing list "How to migrate Kafka Producer ?" > (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to > {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka > producer versions/refactorings. > The issue is that {{ListState > FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using > java serializers and this is causing problems/collisions on > {{FlinkKafkaProducer011.NextTransactionalIdHint}} vs > {{FlinkKafkaProducer.NextTransactionalIdHint}}. > To fix that we probably need to release new versions of those classes, that > will rewrite/upgrade this state field to a new one, that doesn't relay on > java serialization. After this, we could drop the support for the old field > and that in turn will allow users to upgrade from 0.11 connector to the > universal one. > One bright side is that technically speaking our {{FlinkKafkaProducer011}} > has the same compatibility matrix as the universal one (it's also forward & > backward compatible with the same Kafka versions), so for the time being > users can stick to {{FlinkKafkaProducer011}}. > FYI [~tzulitai] [~yanghua] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful
[ https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675634#comment-16675634 ] Elias Levy commented on FLINK-10493: [~tzulitai] correct. It is not evident that the Scala serializers generated by the macros are anonymous classes. One only finds out when a job upgrade fails and starts digging through the code to find the source of the error. Specifically, the section of the documentation that discusses [Type Information in the Scala API|https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#type-information-in-the-scala-api] fails to mention this issue. > Macro generated CaseClassSerializer considered harmful > -- > > Key: FLINK-10493 > URL: https://issues.apache.org/jira/browse/FLINK-10493 > Project: Flink > Issue Type: Bug > Components: Scala API, State Backends, Checkpointing, Type > Serialization System >Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, > 1.6.1 >Reporter: Elias Levy >Priority: Major > > The Flink Scala API uses implicits and macros to generate {{TypeInformation}} > and {{TypeSerializer}} objects for types. In the case of Scala tuple and > case classes, the macro generates an [anonymous {{CaseClassSerializer}} > class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161]. > > The Scala compiler will generate a name for the anonymous class that depends > on the relative position in the code of the macro invocation to other > anonymous classes. If the code is changed such that the anonymous class > relative position changes, even if the overall logic of the code or the type > in question do not change, the name of the serializer class will change. > That will result in errors, such as the one below, if the job is restored > from a savepoint, as the serializer to read the data in the savepoint will no > longer be found, as its name will have changed. > At the very least, there should be a prominent warning in the documentation > about this issue. Minor code changes can result in jobs that can't restore > previous state. Ideally, the use of anonymous classes should be deprecated > if possible. > {noformat} > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputSt
[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified
[ https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675404#comment-16675404 ] Elias Levy commented on FLINK-10520: [~huide] as I said " the system is configured with a default savepoint location". As for {{cancel-job}}, the JSON schema documentation does not mark the key as mandatory. > Job save points REST API fails unless parameters are specified > -- > > Key: FLINK-10520 > URL: https://issues.apache.org/jira/browse/FLINK-10520 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.6.1 >Reporter: Elias Levy >Assignee: Chesnay Schepler >Priority: Minor > > The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error > unless the request includes a body with all parameters ({{target-directory}} > and {{cancel-job}})), even thought the > [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html] > suggests these are optional. > If a POST request with no data is made, the response is a 400 status code > with the error message "Bad request received." > If the POST request submits an empty JSON object ( {} ), the response is a > 400 status code with the error message "Request did not match expected format > SavepointTriggerRequestBody." The same is true if only the > {{target-directory}} or {{cancel-job}} parameters are included. > As the system is configured with a default savepoint location, there > shouldn't be a need to include the parameter in the quest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10617) Restoring job fails because of slot allocation timeout
[ https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668873#comment-16668873 ] Elias Levy edited comment on FLINK-10617 at 10/30/18 3:28 PM: -- I've retested with 1.6.2 and have confirmed the issue still exists, even though FLINK-9932 is fixed. Back trace in the JM: {noformat} 15:21:57,999 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job File Retros (dc0bc5ae8831ab9d0bb7b3535dfbf6c7) switched from state RUNNING to FAILING. Could not allocate all requires slots within timeout of 30 ms. Slots required: 480, slots allocated: 223 org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984) java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source) java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source) java.util.concurrent.CompletableFuture.postComplete(Unknown Source) java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:535) java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source) java.util.concurrent.CompletableFuture.postComplete(Unknown Source) java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) akka.dispatch.OnComplete.internal(Future.scala:258) akka.dispatch.OnComplete.internal(Future.scala:256) akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534) akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20) akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18) scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {noformat} was (Author: elevy): I've retested with 1.6.2 and have confirmed the issue still exists, even though FLINK-9932 is fixed. > Restoring job fails because of slot allocation timeout > -- > > Key: FLINK-10617 > URL: https://issues.apache.org/jira/browse/FLINK-10617 > Project: Flink > Issue Type: Bug > Components: ResourceManager, TaskManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: Elias Levy >Priority: Critical > > The following may be related to FLINK-9932, but I am unsure. If you believe > it is, go ahead and close this issue and a duplicate. > While trying to test local state recovery on a job with large state, the job > failed to be restored because slot allocation timed out. > The job is running on a standalone cluster with 12 nodes and 96 task slots (8 > per node). The job has parallelism of 96, so it consumes all of the slots, > and has ~200 GB of state in RocksDB. > To test local state recovery I decided to kill one of the TMs. The TM > immediately restarted and re-registered with the JM. I confirmed the JM > showed 96 registered task slots. > {noformat} > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Resolved ResourceManager address, beginning registration > 21:35:44,616 INFO org.apache.f
[jira] [Updated] (FLINK-10617) Restoring job fails because of slot allocation timeout
[ https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-10617: --- Affects Version/s: 1.6.2 > Restoring job fails because of slot allocation timeout > -- > > Key: FLINK-10617 > URL: https://issues.apache.org/jira/browse/FLINK-10617 > Project: Flink > Issue Type: Bug > Components: ResourceManager, TaskManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: Elias Levy >Priority: Major > > The following may be related to FLINK-9932, but I am unsure. If you believe > it is, go ahead and close this issue and a duplicate. > While trying to test local state recovery on a job with large state, the job > failed to be restored because slot allocation timed out. > The job is running on a standalone cluster with 12 nodes and 96 task slots (8 > per node). The job has parallelism of 96, so it consumes all of the slots, > and has ~200 GB of state in RocksDB. > To test local state recovery I decided to kill one of the TMs. The TM > immediately restarted and re-registered with the JM. I confirmed the JM > showed 96 registered task slots. > {noformat} > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Resolved ResourceManager address, beginning registration > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Registration at ResourceManager attempt 1 (timeout=100ms) > 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Successful registration at resource manager > akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration > id 302988dea6afbd613bb2f96429b65d18. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,668 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Registration at JobManager attempt 1 (timeout=100ms) > 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}. > 21:36:49,688 INFO org
[jira] [Updated] (FLINK-10617) Restoring job fails because of slot allocation timeout
[ https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-10617: --- Priority: Critical (was: Major) > Restoring job fails because of slot allocation timeout > -- > > Key: FLINK-10617 > URL: https://issues.apache.org/jira/browse/FLINK-10617 > Project: Flink > Issue Type: Bug > Components: ResourceManager, TaskManager >Affects Versions: 1.6.1, 1.6.2 >Reporter: Elias Levy >Priority: Critical > > The following may be related to FLINK-9932, but I am unsure. If you believe > it is, go ahead and close this issue and a duplicate. > While trying to test local state recovery on a job with large state, the job > failed to be restored because slot allocation timed out. > The job is running on a standalone cluster with 12 nodes and 96 task slots (8 > per node). The job has parallelism of 96, so it consumes all of the slots, > and has ~200 GB of state in RocksDB. > To test local state recovery I decided to kill one of the TMs. The TM > immediately restarted and re-registered with the JM. I confirmed the JM > showed 96 registered task slots. > {noformat} > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Resolved ResourceManager address, beginning registration > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Registration at ResourceManager attempt 1 (timeout=100ms) > 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Successful registration at resource manager > akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration > id 302988dea6afbd613bb2f96429b65d18. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,668 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Registration at JobManager attempt 1 (timeout=100ms) > 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}. > 21:36:49,68
[jira] [Commented] (FLINK-10617) Restoring job fails because of slot allocation timeout
[ https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668873#comment-16668873 ] Elias Levy commented on FLINK-10617: I've retested with 1.6.2 and have confirmed the issue still exists, even though FLINK-9932 is fixed. > Restoring job fails because of slot allocation timeout > -- > > Key: FLINK-10617 > URL: https://issues.apache.org/jira/browse/FLINK-10617 > Project: Flink > Issue Type: Bug > Components: ResourceManager, TaskManager >Affects Versions: 1.6.1 >Reporter: Elias Levy >Priority: Major > > The following may be related to FLINK-9932, but I am unsure. If you believe > it is, go ahead and close this issue and a duplicate. > While trying to test local state recovery on a job with large state, the job > failed to be restored because slot allocation timed out. > The job is running on a standalone cluster with 12 nodes and 96 task slots (8 > per node). The job has parallelism of 96, so it consumes all of the slots, > and has ~200 GB of state in RocksDB. > To test local state recovery I decided to kill one of the TMs. The TM > immediately restarted and re-registered with the JM. I confirmed the JM > showed 96 registered task slots. > {noformat} > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Resolved ResourceManager address, beginning registration > 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Registration at ResourceManager attempt 1 (timeout=100ms) > 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Successful registration at resource manager > akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration > id 302988dea6afbd613bb2f96429b65d18. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. > 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,668 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. > 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,681 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Starting ZooKeeperLeaderRetrievalService > /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. > 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Try to register at job manager > akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id > f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Resolved JobManager address, beginning registration > 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor >- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for > job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id > 8e06aa64d5f8961809da38fe7f224cc1. > 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService >- Registration at JobManager attempt 1 (timeout=100ms) > 21:36:49,688 INFO org.apache.flink.runtime.taskexecu
[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability
[ https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16667437#comment-16667437 ] Elias Levy commented on FLINK-9061: --- Just a note to say that changes AWS made to S3 in July means that it is a lot more difficult to hit S3 performance limits that would require this feature, as now S3 can do up to 3.5K concurrent writes. See [https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/]. > Add entropy to s3 path for better scalability > - > > Key: FLINK-9061 > URL: https://issues.apache.org/jira/browse/FLINK-9061 > Project: Flink > Issue Type: Improvement > Components: FileSystem, State Backends, Checkpointing >Affects Versions: 1.4.2, 1.5.0 >Reporter: Jamie Grier >Assignee: Indrajit Roychoudhury >Priority: Critical > Labels: pull-request-available > Fix For: 1.6.2, 1.7.0 > > > I think we need to modify the way we write checkpoints to S3 for high-scale > jobs (those with many total tasks). The issue is that we are writing all the > checkpoint data under a common key prefix. This is the worst case scenario > for S3 performance since the key is used as a partition key. > > In the worst case checkpoints fail with a 500 status code coming back from S3 > and an internal error type of TooBusyException. > > One possible solution would be to add a hook in the Flink filesystem code > that allows me to "rewrite" paths. For example say I have the checkpoint > directory set to: > > s3://bucket/flink/checkpoints > > I would hook that and rewrite that path to: > > s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original > path > > This would distribute the checkpoint write load around the S3 cluster evenly. > > For reference: > https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/ > > Any other people hit this issue? Any other ideas for solutions? This is a > pretty serious problem for people trying to checkpoint to S3. > > -Jamie > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16660864#comment-16660864 ] Elias Levy commented on FLINK-10052: [~Wosinsan] any progress on this issue? > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Assignee: Dominik Wosiński >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout
Elias Levy created FLINK-10617: -- Summary: Restoring job fails because of slot allocation timeout Key: FLINK-10617 URL: https://issues.apache.org/jira/browse/FLINK-10617 Project: Flink Issue Type: Bug Components: ResourceManager, TaskManager Affects Versions: 1.6.1 Reporter: Elias Levy The following may be related to FLINK-9932, but I am unsure. If you believe it is, go ahead and close this issue and a duplicate. While trying to test local state recovery on a job with large state, the job failed to be restored because slot allocation timed out. The job is running on a standalone cluster with 12 nodes and 96 task slots (8 per node). The job has parallelism of 96, so it consumes all of the slots, and has ~200 GB of state in RocksDB. To test local state recovery I decided to kill one of the TMs. The TM immediately restarted and re-registered with the JM. I confirmed the JM showed 96 registered task slots. {noformat} 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Resolved ResourceManager address, beginning registration 21:35:44,616 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Registration at ResourceManager attempt 1 (timeout=100ms) 21:35:44,640 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Successful registration at resource manager akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 302988dea6afbd613bb2f96429b65d18. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}. 21:36:49,667 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,668 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,671 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}. 21:36:49,681 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,681 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,683 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Try to register at job manager akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id f85f6f9b-7713-4be3-a8f0-8443d91e5e6d. 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Resolved JobManager address, beginning registration 21:36:49,687 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 8e06aa64d5f8961809da38fe7f224cc1. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Registration at JobManager attempt 1 (timeout=100ms) 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}. 21:36:49,688 INFO org.apache.flink.runtime.taskexecutor.JobLeaderService - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring. 21:36:49,688 INFO org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Stopping ZooKeeperLeaderRetrievalService /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock. 21:36:49,688 INFO org.apache.flink.runtim
[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified
Elias Levy created FLINK-10520: -- Summary: Job save points REST API fails unless parameters are specified Key: FLINK-10520 URL: https://issues.apache.org/jira/browse/FLINK-10520 Project: Flink Issue Type: Bug Components: REST Affects Versions: 1.6.1 Reporter: Elias Levy The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error unless the request includes a body with all parameters ({{target-directory}} and {{cancel-job}})), even thought the [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html] suggests these are optional. If a POST request with no data is made, the response is a 400 status code with the error message "Bad request received." If the POST request submits an empty JSON object ( {} ), the response is a 400 status code with the error message "Request did not match expected format SavepointTriggerRequestBody." The same is true if only the {{target-directory}} or {{cancel-job}} parameters are included. As the system is configured with a default savepoint location, there shouldn't be a need to include the parameter in the quest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful
[ https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638834#comment-16638834 ] Elias Levy commented on FLINK-10493: This issue was noted [~tzulitai] back in June 2017 [here|https://github.com/apache/flink/pull/4090#issuecomment-307109692]. > Macro generated CaseClassSerializer considered harmful > -- > > Key: FLINK-10493 > URL: https://issues.apache.org/jira/browse/FLINK-10493 > Project: Flink > Issue Type: Bug > Components: Scala API, State Backends, Checkpointing, Type > Serialization System >Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, > 1.5.4 >Reporter: Elias Levy >Priority: Major > > The Flink Scala API uses implicits and macros to generate {{TypeInformation}} > and {{TypeSerializer}} objects for types. In the case of Scala tuple and > case classes, the macro generates an [anonymous {{CaseClassSerializer}} > class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161]. > > The Scala compiler will generate a name for the anonymous class that depends > on the relative position in the code of the macro invocation to other > anonymous classes. If the code is changed such that the anonymous class > relative position changes, even if the overall logic of the code or the type > in question do not change, the name of the serializer class will change. > That will result in errors, such as the one below, if the job is restored > from a savepoint, as the serializer to read the data in the savepoint will no > longer be found, as its name will have changed. > At the very least, there should be a prominent warning in the documentation > about this issue. Minor code changes can result in jobs that can't restore > previous state. Ideally, the use of anonymous classes should be deprecated > if possible. > {noformat} > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > at java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.
[jira] [Created] (FLINK-10493) Macro generated CaseClassSerializer considered harmful
Elias Levy created FLINK-10493: -- Summary: Macro generated CaseClassSerializer considered harmful Key: FLINK-10493 URL: https://issues.apache.org/jira/browse/FLINK-10493 Project: Flink Issue Type: Bug Components: Scala API, State Backends, Checkpointing, Type Serialization System Affects Versions: 1.5.4, 1.6.1, 1.6.0, 1.5.3, 1.5.2, 1.5.1, 1.4.2, 1.4.1, 1.4.0 Reporter: Elias Levy The Flink Scala API uses implicits and macros to generate {{TypeInformation}} and {{TypeSerializer}} objects for types. In the case of Scala tuple and case classes, the macro generates an [anonymous {{CaseClassSerializer}} class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161]. The Scala compiler will generate a name for the anonymous class that depends on the relative position in the code of the macro invocation to other anonymous classes. If the code is changed such that the anonymous class relative position changes, even if the overall logic of the code or the type in question do not change, the name of the serializer class will change. That will result in errors, such as the one below, if the job is restored from a savepoint, as the serializer to read the data in the savepoint will no longer be found, as its name will have changed. At the very least, there should be a prominent warning in the documentation about this issue. Minor code changes can result in jobs that can't restore previous state. Ideally, the use of anonymous classes should be deprecated if possible. {noformat} WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) ... 14 more Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(Fl
[jira] [Closed] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled
[ https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy closed FLINK-10483. -- Resolution: Invalid > Can't restore from a savepoint even with Allow Non Restored State enabled > - > > Key: FLINK-10483 > URL: https://issues.apache.org/jira/browse/FLINK-10483 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing, Type Serialization System >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Major > > A trimmed streaming job fails a restore from a savepoint with an Unloadable > class for type serializer error, even though the case class in question has > been eliminated from the job and Allow Non Restored State is enabled. > We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, > one of the streams is processed by an async function, and the output of the > async function and the other original stream are consumed by a > CoProcessOperator, that intern emits Scala case class instances, that go into > a stateful ProcessFunction filter, and then into a sink. I.e. > {code:java} > source 1 -> async function --\ >|---> co process --> process > --> sink > source 2 --/ > {code} > I eliminated most of the DAG, leaving only the source 1 --> async function > portion of it. This removed the case class in question from the processing > graph. When I try to restore from the savepoint, even if Allow Non Restored > State is selected, the job fails to restore with the error "Deserialization > of serializer erroed". > This is the error being generated: > {noformat} > WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil > - Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > at java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 14 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.l
[jira] [Created] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled
Elias Levy created FLINK-10483: -- Summary: Can't restore from a savepoint even with Allow Non Restored State enabled Key: FLINK-10483 URL: https://issues.apache.org/jira/browse/FLINK-10483 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing, Type Serialization System Affects Versions: 1.4.2 Reporter: Elias Levy A trimmed streaming job fails a restore from a savepoint with an Unloadable class for type serializer error, even though the case class in question has been eliminated from the job and Allow Non Restored State is enabled. We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, one of the streams is processed by an async function, and the output of the async function and the other original stream are consumed by a CoProcessOperator, that intern emits Scala case class instances, that go into a stateful ProcessFunction filter, and then into a sink. I.e. {code:java} source 1 -> async function --\ |---> co process --> process --> sink source 2 --/ {code} I eliminated most of the DAG, leaving only the source 1 --> async function portion of it. This removed the case class in question from the processing graph. When I try to restore from the savepoint, even if Allow Non Restored State is selected, the job fails to restore with the error "Deserialization of serializer erroed". This is the error being generated: {noformat} WARN org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - Deserialization of serializer errored; replacing with null. java.io.IOException: Unloadable class for type serializer. at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) at org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) at org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) at org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) Caused by: java.io.InvalidClassException: failed to read class descriptor at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.defaultReadFields(Unknown Source) at java.io.ObjectInputStream.readSerialData(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) ... 14 more Caused by: java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at j
[jira] [Created] (FLINK-10460) DataDog reporter JsonMappingException
Elias Levy created FLINK-10460: -- Summary: DataDog reporter JsonMappingException Key: FLINK-10460 URL: https://issues.apache.org/jira/browse/FLINK-10460 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.4.2 Reporter: Elias Levy Observed the following error in the TM logs this morning: {code:java} WARN org.apache.flink.metrics.datadog.DatadogHttpReporter - Failed reporting metrics to Datadog. org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException: (was java.util.ConcurrentModificationException) (through reference chain: org.apache.flink.metrics.datadog.DSeries["series"]-> java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"]) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998) at org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90) at org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79) at org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.runAndReset(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Caused by: java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source) at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source) at java.util.AbstractCollection.addAll(Unknown Source) at java.util.HashSet.(Unknown Source) at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65) at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) at org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) at org.apache.flink.metrics.datadog.DGauge.getMetricValue(DGauge.java:42) at org.apache.flink.metrics.
[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning
Elias Levy created FLINK-10390: -- Summary: DataDog metric reporter leak warning Key: FLINK-10390 URL: https://issues.apache.org/jira/browse/FLINK-10390 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.6.1 Reporter: Elias Levy After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings associated with the DataDog metrics reporter: {quote}Sep 21, 2018 9:43:20 PM org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A connection to https://app.datadoghq.com/ was leaked. Did you forget to close a response body? To see where this was allocated, set the OkHttpClient logger level to FINE: Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE); {quote} The metric reporter's okhttp dependency version (3.7.0) has not changed, so that does not appear to be the source of the warning. I believe the issue is the changed made in [FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b]. The HTTP calls were made async. The previous code called {{client.newCall(r).execute().close()}}. The new call does nothing in the callback, even thought the [Callback.onResponse documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-] states: bq. Called when the HTTP response was successfully returned by the remote server. The callback may proceed to read the response body with Response.body. The response is still live until its response body is closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10372) There is no API to configure the timer state backend
Elias Levy created FLINK-10372: -- Summary: There is no API to configure the timer state backend Key: FLINK-10372 URL: https://issues.apache.org/jira/browse/FLINK-10372 Project: Flink Issue Type: Improvement Components: DataStream API, State Backends, Checkpointing Affects Versions: 1.6.0 Reporter: Elias Levy Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB instead of the heap. Alas, this can only be configured via the {{state.backend.rocksdb.timer-service.factory}} config file option. That means that the choice of state backend to use for timer can't be made on a per job basis on a shared cluster. There is a need for an API in {{RocksDBStateBackend}} to configure the backend per job. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka
[ https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617899#comment-16617899 ] Elias Levy commented on FLINK-10348: [~wind_ljy] Re: 1. The problem is timestamp alignment. Setting like fetch sizes, max waits, etc are simply mechanism you can use to attempt to influence the rate of processing the better align the timestamps. Those mechanism are at least one level removed from the actual issue. It is best to address the issue directly by attempting to align timestamp during consumption. Re: 2. Internally the Kafka consumer behaves like a multiple input operator, merging watermarks and messages from each partition, which it then forwards downstream. The Kafka consumer can also selectively forward messages from the partitions with the lowest waternark if they are available. > Solve data skew when consuming data from kafka > -- > > Key: FLINK-10348 > URL: https://issues.apache.org/jira/browse/FLINK-10348 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Affects Versions: 1.6.0 >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > By using KafkaConsumer, our strategy is to send fetch request to brokers with > a fixed fetch size. Assume x topic has n partition and there exists data skew > between partitions, now we need to consume data from x topic with earliest > offset, and we can get max fetch size data in every fetch request. The > problem is that when an task consumes data from both "big" partitions and > "small" partitions, the data in "big" partitions may be late elements because > "small" partitions are consumed faster. > *Solution: * > I think we can leverage two parameters to control this. > 1. data.skew.check // whether to check data skew > 2. data.skew.check.interval // the interval between checks > Every data.skew.check.interval, we will check the latest offset of every > specific partition, and calculate (latest offset - current offset), then get > partitions which need to slow down and redefine their fetch size. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10348) Solve data skew when consuming data from kafka
[ https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16617899#comment-16617899 ] Elias Levy edited comment on FLINK-10348 at 9/17/18 5:48 PM: - [~wind_ljy] Re: 1. The problem is timestamp alignment. Setting like fetch sizes, max waits, etc are simply mechanism you can use to attempt to influence the rate of processing to better align the timestamps. Those mechanism are at least one level removed from the actual issue. It is best to address the issue directly by attempting to align timestamp during consumption. Re: 2. Internally the Kafka consumer behaves like a multiple input operator, merging watermarks and messages from each partition, which it then forwards downstream. The Kafka consumer can also selectively forward messages from the partitions with the lowest waternark if they are available. was (Author: elevy): [~wind_ljy] Re: 1. The problem is timestamp alignment. Setting like fetch sizes, max waits, etc are simply mechanism you can use to attempt to influence the rate of processing the better align the timestamps. Those mechanism are at least one level removed from the actual issue. It is best to address the issue directly by attempting to align timestamp during consumption. Re: 2. Internally the Kafka consumer behaves like a multiple input operator, merging watermarks and messages from each partition, which it then forwards downstream. The Kafka consumer can also selectively forward messages from the partitions with the lowest waternark if they are available. > Solve data skew when consuming data from kafka > -- > > Key: FLINK-10348 > URL: https://issues.apache.org/jira/browse/FLINK-10348 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Affects Versions: 1.6.0 >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > By using KafkaConsumer, our strategy is to send fetch request to brokers with > a fixed fetch size. Assume x topic has n partition and there exists data skew > between partitions, now we need to consume data from x topic with earliest > offset, and we can get max fetch size data in every fetch request. The > problem is that when an task consumes data from both "big" partitions and > "small" partitions, the data in "big" partitions may be late elements because > "small" partitions are consumed faster. > *Solution: * > I think we can leverage two parameters to control this. > 1. data.skew.check // whether to check data skew > 2. data.skew.check.interval // the interval between checks > Every data.skew.check.interval, we will check the latest offset of every > specific partition, and calculate (latest offset - current offset), then get > partitions which need to slow down and redefine their fetch size. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka
[ https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616933#comment-16616933 ] Elias Levy commented on FLINK-10348: I would suggest the strategy employed by Kafka Streams, which performs a best effort attempt to align streams by selectively fetching from the stream with the lowest watermark of there are messages available. Rather than implementing something like this writhin the Kafka connector source, which are independent tasks in Flink, I would suggest implementing it within multiple input operators. The operator can selectively process messages from the input stream with the lowest waternark if they are available. Back preassure can the take care of slowing down the higher volume input of nessesary. > Solve data skew when consuming data from kafka > -- > > Key: FLINK-10348 > URL: https://issues.apache.org/jira/browse/FLINK-10348 > Project: Flink > Issue Type: New Feature > Components: Kafka Connector >Affects Versions: 1.6.0 >Reporter: Jiayi Liao >Assignee: Jiayi Liao >Priority: Major > > By using KafkaConsumer, our strategy is to send fetch request to brokers with > a fixed fetch size. Assume x topic has n partition and there exists data skew > between partitions, now we need to consume data from x topic with earliest > offset, and we can get max fetch size data in every fetch request. The > problem is that when an task consumes data from both "big" partitions and > "small" partitions, the data in "big" partitions may be late elements because > "small" partitions are consumed faster. > *Solution: * > I think we can leverage two parameters to control this. > 1. data.skew.check // whether to check data skew > 2. data.skew.check.interval // the interval between checks > Every data.skew.check.interval, we will check the latest offset of every > specific partition, and calculate (latest offset - current offset), then get > partitions which need to slow down and redefine their fetch size. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
[ https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16586177#comment-16586177 ] Elias Levy commented on FLINK-10184: [~Jamalarm] this seems like a duplicate of FLINK-10011. Can you confirm? > HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel > -- > > Key: FLINK-10184 > URL: https://issues.apache.org/jira/browse/FLINK-10184 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.5.2 >Reporter: Thomas Wozniakowski >Priority: Blocker > > We have encountered a blocking issue when upgrading our cluster to 1.5.2. > It appears that, when jobs are cancelled manually (in our case with a > savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} > node. > This means that, if you start a job, cancel it, restart it, cancel it, etc. > You will end up with many job graphs stored in zookeeper, but none of the > corresponding blobs in the Flink HA directory. > When a HA failover occurs, the newly elected leader retrieves all of those > old JobGraph objects from Zookeeper, then goes looking for the corresponding > blobs in the HA directory. The blobs are not there so the JobManager explodes > and the process dies. > At this point the cluster has to be fully stopped, the zookeeper jobgraphs > cleared out by hand, and all the jobmanagers restarted. > I can see the following line in the JobManager logs: > {quote} > 2018-08-20 16:17:20,776 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - > Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper. > {quote} > But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is > still very much there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)
[ https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16578792#comment-16578792 ] Elias Levy commented on FLINK-10133: [~Frefreak] this is likely the same issue as FLINK-10011. If so, mark this one as a duplicate. > finished job's jobgraph never been cleaned up in zookeeper for standalone > clusters (HA mode with multiple masters) > -- > > Key: FLINK-10133 > URL: https://issues.apache.org/jira/browse/FLINK-10133 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0, 1.5.2, 1.6.0 >Reporter: Xiangyu Zhu >Priority: Major > > Hi, > We have 3 servers in our test environment, noted as node1-3. Setup is as > following: > * hadoop hdfs: node1 as namenode, node2,3 as datanode > * zookeeper: node1-3 as a quorum (but also tried node1 alone) > * flink: node1,2 as masters, node2,3 as slaves > As my understanding when a job finished the corresponding job's blob data is > expected to be deleted from hdfs path and node under zookeeper's path `/\{zk > path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. > However we observe that whenever we submitted a job and it finished (via > `bin/flink run WordCount.jar`), the blob data is gone whereas job id node > under zookeeper is still there, with a uuid style lock node inside it. From > the debug node in zookeeper we observed something like "cannot be deleted > because non empty". Because of this, as long as a job is finished and the > jobgraph node persists, if restart the clusters or kill one manager (to test > HA mode), it tries to recover a finished job and couldn't find blob data > under hdfs, and the whole cluster is down. > If we use only node1 as master and node2,3 as slaves, the jobgraphs node can > be deleted successfully. If the jobgraphs is clean, killing one job manager > makes another stand-by JM raised as leader, so it is only this jobgraphs > issue preventing HA from working. > I'm not sure if there's something wrong with our configs because this happens > every time for finished job (we only tested with wordcount.jar though). I'm > aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens > every time, rendering HA mode un-useable for us. > Any idea what might cause this? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10118) Queryable state MapState entry query
Elias Levy created FLINK-10118: -- Summary: Queryable state MapState entry query Key: FLINK-10118 URL: https://issues.apache.org/jira/browse/FLINK-10118 Project: Flink Issue Type: Improvement Components: Queryable State Affects Versions: 1.6.0 Reporter: Elias Levy Queryable state allows querying of keyed MapState, but such a query returns all MapState entries for the given key. In some cases, such MapState many include substantial number of entries (in the millions), while the user may only be interested in one entry. I propose we allow queries for MapState to provide one or more map entry keys, in addition to the state key, and to only return entries for the given map keys. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10117) REST API for Queryable State
Elias Levy created FLINK-10117: -- Summary: REST API for Queryable State Key: FLINK-10117 URL: https://issues.apache.org/jira/browse/FLINK-10117 Project: Flink Issue Type: Improvement Components: Queryable State, REST Affects Versions: 1.6.0 Reporter: Elias Levy At the moment, queryable state requires a JVM based client that can make use of the Java queryable state client API in flink-queryable-state-client artifact. In addition, the client requires a state descriptor matching the queried state, which tightly couples the Flink job and query state clients. I propose that queryable state become accessible via a REST API. FLINK-7040 mentions this possibility, but does not specify work towards that goal. I suggest that to enable queryable state over REST, users define JSON serializers via the state descriptors. This would allow queryable state clients to be developed in any language, not require them to use a Flink client library, and permit them to be loosely coupled with the job, as they could generically parse the returned JSON. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10098) Programmatically select timer storage backend
Elias Levy created FLINK-10098: -- Summary: Programmatically select timer storage backend Key: FLINK-10098 URL: https://issues.apache.org/jira/browse/FLINK-10098 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing, Streaming, TaskManager Affects Versions: 1.6.0, 1.7.0 Reporter: Elias Levy FLINK-9486 introduced timer storage on the RocksDB storage backend. Right now it is only possible to configure RocksDB as the storage for timers by settings the {{state.backend.rocksdb.timer-service.factory}} value in the configuration file for Flink. As the state storage backend can be programmatically selected by by jobs via {{env.setStateBackend(...)}}, the timer backend should also be configurable programmatically. Different jobs should be able to store their timers in different storage backends. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10067) Add Kafka 1.0.0/1.1.0 connectors
[ https://issues.apache.org/jira/browse/FLINK-10067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16570371#comment-16570371 ] Elias Levy commented on FLINK-10067: Kafka clients are compatible with lower version servers and Kafka 2.0.0 has been released. Should this issue add a 2.0.0 connector instead? > Add Kafka 1.0.0/1.1.0 connectors > > > Key: FLINK-10067 > URL: https://issues.apache.org/jira/browse/FLINK-10067 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector >Reporter: Aljoscha Krettek >Priority: Major > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8545) Implement upsert stream table source
[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16569040#comment-16569040 ] Elias Levy commented on FLINK-8545: --- [~hequn8128] are you still working on this feature? > Implement upsert stream table source > - > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Hequn Cheng >Assignee: Hequn Cheng >Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function
[ https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568955#comment-16568955 ] Elias Levy commented on FLINK-9600: --- [~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape hatch when you can't perform what you want within the higher level DSL. The improvement I am suggestion is within the higher level DSL. E.g.it is a lot nicer to write: {code:java} dataStream.filter( (x, ts) => { isDayTime(ts) } ) {code} than {code:java} class ProcessFilter extends ProcessFunction[T,T] { override def processElement(value: T, ctx: Context, out: Collector[T]): Unit ={ if (isDayTime(ctx.timestamp)) out.collect(value) } } } dataStream.process(new ProcessFilter()) {code} > Add DataStream transformation variants that pass timestamp to the user > function > --- > > Key: FLINK-9600 > URL: https://issues.apache.org/jira/browse/FLINK-9600 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.5.0 >Reporter: Elias Levy >Priority: Minor > > It is often necessary to access the timestamp assigned to records within user > functions. At the moment this is only possible from {{RichFunction}}. > Implementing a {{RichFunction}} just to access the timestamp is burdensome, > so most job carry a duplicate of the timestamp within the record. > It would be useful if {{DataStream}} provided transformation methods that > accepted user functions that could be passed the record's timestamp as an > additional argument, similar to how there are two variants of {{flatMap}}, > one with an extra parameter that gives the user function access to the output > {{Collector}}. > Along similar lines, it may be useful to have variants that pass the record's > key as an additional parameter. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9720) Introduce ResourceTag class for tag support in scheduling
[ https://issues.apache.org/jira/browse/FLINK-9720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568928#comment-16568928 ] Elias Levy commented on FLINK-9720: --- This is show as fixed in 1.6.0 but has not been committed. > Introduce ResourceTag class for tag support in scheduling > - > > Key: FLINK-9720 > URL: https://issues.apache.org/jira/browse/FLINK-9720 > Project: Flink > Issue Type: New Feature > Components: Scheduler >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9662) Task manager isolation for jobs
[ https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568927#comment-16568927 ] Elias Levy commented on FLINK-9662: --- This is show as fixed in 1.6.0, but AFAIK nothing has been committed to show that being case. > Task manager isolation for jobs > --- > > Key: FLINK-9662 > URL: https://issues.apache.org/jira/browse/FLINK-9662 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.6.0 > > Attachments: job isolation sequence.jpg > > > Disable task manager sharing for different jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568832#comment-16568832 ] Elias Levy edited comment on FLINK-10052 at 8/3/18 9:46 PM: [~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. So it is possible that all we need to do is call {{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the {{CuratorFrameworkFactory}}. was (Author: elevy): [~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED }}and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED }}state, so that a disconnected client won't continue to think it is leader past its session expiration. So it is possible that all we need to do is call {{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the {{CuratorFrameworkFactory}}. > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections
[ https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568832#comment-16568832 ] Elias Levy commented on FLINK-10052: [~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED }}and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED }}state, so that a disconnected client won't continue to think it is leader past its session expiration. So it is possible that all we need to do is call {{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the {{CuratorFrameworkFactory}}. > Tolerate temporarily suspended ZooKeeper connections > > > Key: FLINK-10052 > URL: https://issues.apache.org/jira/browse/FLINK-10052 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.4.2, 1.5.2, 1.6.0 >Reporter: Till Rohrmann >Priority: Major > > This issue results from FLINK-10011 which uncovered a problem with Flink's HA > recovery and proposed the following solution to harden Flink: > The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator > recipe for leader election. The leader latch revokes leadership in case of a > suspended ZooKeeper connection. This can be premature in case that the system > can reconnect to ZooKeeper before its session expires. The effect of the lost > leadership is that all jobs will be canceled and directly restarted after > regaining the leadership. > Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper > connection, it would be better to wait until the ZooKeeper connection is > LOST. That way we would allow the system to reconnect and not lose the > leadership. This could be achievable by using Curator's {{LeaderSelector}} > instead of the {{LeaderLatch}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10037) Document details event time behavior in a single location
Elias Levy created FLINK-10037: -- Summary: Document details event time behavior in a single location Key: FLINK-10037 URL: https://issues.apache.org/jira/browse/FLINK-10037 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.5.2 Reporter: Elias Levy Assignee: Elias Levy A description of event time and watermarks, how they generated, assigned, and handled, is spread across many pages in the documentation. I would be useful to have it all in a single place and includes missing information, such as how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10030) zookeeper jobgraphs job info cannot be removed when the job is cancelled with zk ha mode
[ https://issues.apache.org/jira/browse/FLINK-10030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566973#comment-16566973 ] Elias Levy commented on FLINK-10030: This is possibly related to FLINK-10011. > zookeeper jobgraphs job info cannot be removed when the job is cancelled with > zk ha mode > > > Key: FLINK-10030 > URL: https://issues.apache.org/jira/browse/FLINK-10030 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.0 >Reporter: qiang.li >Priority: Major > > flink 1.5 with zk ha mode,when a job is cancelled,if you restart the > cluster,the jobmanager will fail because of missing the blob data. I find > that the information about the job in zk node jobgraphs cannot be removed > due to the standby jobmanager lock the node.I think that standby jobmanager > should not be watch the jobgraphs node. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16566032#comment-16566032 ] Elias Levy commented on FLINK-10011: [~azagrebin] I don't think they are the same issue. The issue I am observing is that the new JM leader after a failover can't delete a job graph in ZK when it is canceled because the previous JM leader still has the job graph locked in ZK via the child ephemeral node. This is the state in ZK: [zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs [d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1] Job 2a4eff355aef849c5ca37dbac04f2ff1 was running before fail over and we canceled after fail over. The job is no longer running, but it is still in ZK. In the logs we see that the JM 1 (10.210.22.167), that one that became leader after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from ZK when it was canceled: July 30th 2018, 15:32:27.231Trying to cancel job with ID 2a4eff355aef849c5ca37dbac04f2ff1. July 30th 2018, 15:32:27.232Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING to CANCELED. July 30th 2018, 15:32:27.232Stopping checkpoint coordinator for job 2a4eff355aef849c5ca37dbac04f2ff1 July 30th 2018, 15:32:27.239Removed job graph 2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper. July 30th 2018, 15:32:27.245Removing /flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper July 30th 2018, 15:32:27.251Removing /checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper Looking at the ZK logs I find the problem: July 30th 2018, 15:32:27.241Got user-level KeeperException when processing sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 reqpath:n/a Error Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = Directory not empty for /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 Looking in ZK, we see: [zk: localhost:2181(CONNECTED) 0] ls /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 [d833418c-891a-4b5e-b983-080be803275c] >From the comments in ZooKeeperStateHandleStore.java I gather that this child >node is used as a deletion lock. Looking at the contents of this ephemeral >lock node: [zk: localhost:2181(CONNECTED) 16] get /flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c 10.210.42.62 cZxid = 0x60002ffa7 ctime = Tue Jun 12 20:01:26 UTC 2018 mZxid = 0x60002ffa7 mtime = Tue Jun 12 20:01:26 UTC 2018 pZxid = 0x60002ffa7 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x3003f4a0003 dataLength = 12 numChildren = 0 and compared to the ephemeral node lock of the currently running job: [zk: localhost:2181(CONNECTED) 17] get /flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172 10.210.22.167 cZxid = 0x60009df4b ctime = Mon Jul 30 23:01:04 UTC 2018 mZxid = 0x60009df4b mtime = Mon Jul 30 23:01:04 UTC 2018 pZxid = 0x60009df4b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x201d2330001 dataLength = 13 numChildren = 0 Assuming the content of the nodes represent the owner, it seems the job graph for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the previous JM leader, JM 2(10.210.42.62), while the running job locked by the current JM leader, JM 1 (10.210.22.167). Somehow the previous leader, JM 2, did not give up the lock when leadership failed over to JM 2. Note that JM 2 never lost it's ZK session, as it recovered it when it connected to another ZK node. So some code in the JobManager needed to explicitly release the lock on the job graph during failover and failed to do so. [~till.rohrmann] and [~uce] I think you wrote the ZK HA code. Any thoughts? > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network erro
[jira] [Updated] (FLINK-9575) Potential race condition when removing JobGraph in HA
[ https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-9575: -- Description: When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_, the following code is executed : {noformat} val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }{noformat} This causes the asynchronous removal of the job and synchronous removal of blob files connected with this jar. This means as far as I understand that there is a potential problem that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and we elect the new leader it can try to recover such job, but it will fail with an exception since the assigned blob was already removed. was: When we are removing the _JobGraph_ from _JobManager_ for example after invoking _cancel()_, the following code is executed : {noformat} val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption } val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val result = if (removeJobFromStateBackend) { val futureOption = Some(future { try { // ...otherwise, we can have lingering resources when there is a concurrent shutdown // and the ZooKeeper client is closed. Not removing the job immediately allow the // shutdown to release all resources. submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", t) } }(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } else { None } currentJobs.remove(jobID) result case None => None } // remove all job-related BLOBs from local and HA store libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }{noformat} This causes the asynchronous removal of the job and synchronous removal of blob files connected with this jar. This means as far as I understand that there is a potential problem that we can fail to remove job graph from _submittedJobGraphs._ If the JobManager fails and we elect the new leader it can try to recover such job, but it will fail with an exception since the assigned blob was already removed. > Potential race condition when removing JobGraph in HA > - > > Key: FLINK-9575 > URL: https://issues.apache.org/jira/browse/FLINK-9575 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: Dominik Wosiński >Assignee: Dominik Wosiński >Priority: Critical > Labels: pull-request-available > Fix For: 1.5.2, 1.6.0 > > > When we are removing the _JobGraph_ from _JobManager_ for example after > invoking _cancel()_, the follow
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564573#comment-16564573 ] Elias Levy commented on FLINK-10011: [~trohrm...@apache.org] what do you think? > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted > leadership with leader session ID > Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} > * 15:19:57 JM 2 reports the job has been suspended > ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter > Shutting down.}} > ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} > * 15:19:57 JM 2 reports it has lost leadership: > ** {{Associated JobManager > Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} > ** {{Received leader address but not running in leader ActorSystem. > Cancelling registration.}} > * 15:19:57 TMs register with JM 1 > * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: > ** {{Attempting to recover all jobs.}} > ** {{There are 2 jobs to recover. Starting the job recovery.}} > ** {{Attempting to recover job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} > ** {{Attempting to recover job > {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} > * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: > ** {{Got user-level KeeperException when processing > sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0
[jira] [Comment Edited] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564569#comment-16564569 ] Elias Levy edited comment on FLINK-10011 at 8/1/18 12:48 AM: - It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch}} and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. was (Author: elevy): It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping chec
[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover
[ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564569#comment-16564569 ] Elias Levy commented on FLINK-10011: It appears that it may not be necessary to replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during temporary connection failures. The Curator error handling [documentation|https://curator.apache.org/errors.html] talks about a {{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} connection states differently. And this [test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146] shows how leadership is not lost with a {{LeaderLatch and that policy. The [code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631] implementing the policy. And [this shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314] that Curator will inject a session expiration even while it is in {{SUSPENDED}} state, so that a disconnected client won't continue to think it is leader past its session expiration. > Old job resurrected during HA failover > -- > > Key: FLINK-10011 > URL: https://issues.apache.org/jira/browse/FLINK-10011 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Blocker > > For the second time we've observed Flink resurrect an old job during > JobManager high-availability fail over. > h4. Configuration > * AWS environment > * Flink 1.4.2 standalong cluster in HA mode > * 2 JMs, 3 TMs > * 3 node ZK ensemble > * 1 job consuming to/from Kafka > * Checkpoints in S3 using the Presto file system adaptor > h4. Timeline > * 15:18:10 JM 2 completes checkpoint 69256. > * 15:19:10 JM 2 completes checkpoint 69257. > * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a > SocketTimeoutException > * 15:19:57 ZK 1 closes connection to JM 2 (leader) > * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK > 1 > * 15:19:57 JM 2 reports it can't read data from ZK > ** {{Unable to read additional data from server sessionid 0x3003f4a0003, > likely server has closed socket, closing socket connection and attempting > reconnect)}} > ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} > * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs > are not monitored (temporarily).}} > ** {{Connection to ZooKeeper suspended. The contender > akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in > the leader election}}{{ }} > ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader > from ZooKeeper.}} > * 15:19:57 JM 2 gives up leadership > ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked > leadership.}} > * 15:19:57 JM 2 changes job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED > ** {{Stopping checkpoint coordinator for job > {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} > * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages > because there is no leader > ** {{Discard message > LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: > TaskManager akka://flink/user/taskmanager is disassociating)) because there > is currently no valid leader id known.}} > * 15:19:57 JM 2 connects to ZK 2 and renews its session > ** {{Opening socket connection to server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} > ** {{Socket connection established to > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} > ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be > restarted.}} > ** {{Session establishment complete on server > ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = > 0x3003f4a0003, negotiated timeout = 4}} > ** {{Connection to ZooKeeper was reconnected. Leader election can be > restarted.}} > ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs > are monitored again.}} > ** {{State change: RECONNECTED}} > * 15:19:57: JM 1 reports JM 1 has been granted leadership: > ** {{JobManager akka.tcp://flin
[jira] [Created] (FLINK-10011) Old job resurrected during HA failover
Elias Levy created FLINK-10011: -- Summary: Old job resurrected during HA failover Key: FLINK-10011 URL: https://issues.apache.org/jira/browse/FLINK-10011 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.2 Reporter: Elias Levy For the second time we've observed Flink resurrect an old job during JobManager high-availability fail over. h4. Configuration * AWS environment * Flink 1.4.2 standalong cluster in HA mode * 2 JMs, 3 TMs * 3 node ZK ensemble * 1 job consuming to/from Kafka * Checkpoints in S3 using the Presto file system adaptor h4. Timeline * 15:18:10 JM 2 completes checkpoint 69256. * 15:19:10 JM 2 completes checkpoint 69257. * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException * 15:19:57 ZK 1 closes connection to JM 2 (leader) * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1 * 15:19:57 JM 2 reports it can't read data from ZK ** {{Unable to read additional data from server sessionid 0x3003f4a0003, likely server has closed socket, closing socket connection and attempting reconnect)}} ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}} * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}} ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not monitored (temporarily).}} ** {{Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the leader election}}{{ }} ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}} * 15:19:57 JM 2 gives up leadership ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}} * 15:19:57 JM 2 changes job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED ** {{Stopping checkpoint coordinator for job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}} * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because there is no leader ** {{Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception: TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no valid leader id known.}} * 15:19:57 JM 2 connects to ZK 2 and renews its session ** {{Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181}} ** {{Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}} ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.}} ** {{Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 0x3003f4a0003, negotiated timeout = 4}} ** {{Connection to ZooKeeper was reconnected. Leader election can be restarted.}} ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.}} ** {{State change: RECONNECTED}} * 15:19:57: JM 1 reports JM 1 has been granted leadership: ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}} * 15:19:57 JM 2 reports the job has been suspended ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.}} ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}} * 15:19:57 JM 2 reports it has lost leadership: ** {{Associated JobManager Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}} ** {{Received leader address but not running in leader ActorSystem. Cancelling registration.}} * 15:19:57 TMs register with JM 1 * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs: ** {{Attempting to recover all jobs.}} ** {{There are 2 jobs to recover. Starting the job recovery.}} ** {{Attempting to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}} ** {{Attempting to recover job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}} * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form: ** {{Got user-level KeeperException when processing sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}} ** {{Got user-level KeeperException when processing sessionid:0x201d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561216#comment-16561216 ] Elias Levy commented on FLINK-6243: --- Rereading my initial description of the issue, I see that I make no mention to our specific upsert requirements, so I think you are right that FLINK-8478 does satisfy this issue as described and that it may be best if I open a new issue for the upsert and (a)/(a,b) join semantics I'd like. > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561215#comment-16561215 ] Elias Levy commented on FLINK-6243: --- Stephan, thanks for bringing FLINK-8478 to my attention. Alas, while getting closer to meeting our join requirements, it does not quite fulfill them. Our joins require the semantics of joining two upsert tables, i.e. only joining the latest value by key. The DataStream Interval Join being implemented does not support those semantics, as it will buffer and join all elements for a key that fall within the interval. Seems the upsert semantics could be implemented by changing the state from a {{MapState}} buffering multiple events per key to a {{ValueState}}, keeping the latest event per event time. We also need these joins to be outer joins, but I see that there is already a subtask to implement those (FLINK-8483). Finally, we also need to implement a join between two streams where one stream is keyed by a subset of other stream's composite key (e.g. the left stream is keyed by {{col1}} and the right stream by ({{col1}}, {{col2)}}), also with upsert semantics. This could be implemented by keying both streams by {{col1}}, keeping a ValueState for the left stream buffering the latest event, and using a MapState on the right stream keyed by {{col2}} buffering the latest event per ({{col1}}, {{col2}}) tuple. Maybe something like: {code:scala} leftStream .keyKey(_.col1) .upsertJoin(rightStream.keyKey(_.col1).subKey(_.col2)) .between(...) .process(...) {code} Looking at the implementation, I also worry that the clean up timers are not being coalesced, which may result in high overhead processing the clean up timers for high throughput streams. > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9953) Active Kubernetes integration
[ https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555933#comment-16555933 ] Elias Levy commented on FLINK-9953: --- This is a duplicate of FLINK-9495. > Active Kubernetes integration > - > > Key: FLINK-9953 > URL: https://issues.apache.org/jira/browse/FLINK-9953 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, ResourceManager >Reporter: Till Rohrmann >Priority: Major > Fix For: 1.7.0 > > > This is the umbrella issue tracking Flink's active Kubernetes integration. > Active means in this context that the {{ResourceManager}} can talk to > Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6239) Sharing of State Across Operators
[ https://issues.apache.org/jira/browse/FLINK-6239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533142#comment-16533142 ] Elias Levy commented on FLINK-6239: --- For our particular use case having a single operator that is owner and can write, and other that can read the state, is sufficient. The idea was essentially to implement a materialized table join (similar to Kafka Streams KTable). We have need to join again the table multiple times, but the table (state) would be of significant size, and duplicating it is something we rather not do. That said, I think we may be able to get around this issue by creating a generic wrapping container, unioning all the streams, and feeding it to a single operator that perform all the work, instead of having multiple operators and multiple streams. Not as clean, but probably workable. > Sharing of State Across Operators > - > > Key: FLINK-6239 > URL: https://issues.apache.org/jira/browse/FLINK-6239 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Currently state cannot be shared across operators. On a keyed stream, the > state is implicitly keyed by the operator id, in addition to the stream key. > This can make it more difficult and inefficient to implement complex > topologies, where multiple operator may need to access the same state. It > would be value to be able to access keyed value and map stated across > operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy closed FLINK-9731. - Resolution: Invalid > Kafka source subtask begins to consume from earliest offset > --- > > Key: FLINK-9731 > URL: https://issues.apache.org/jira/browse/FLINK-9731 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Critical > > On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink > job instance began consuming records from the earliest offsets available in > Kafka for the partitions assigned to it. Other subtasks did not exhibit this > behavior and continued operating normally. > Previous to the event the job exhibited no Kafka lag. The job showed no > failed checkpoints and the job did not restore or restart. Flink logs only > shoed the following message: > {noformat} > June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for > partition topic-124, resetting offset > {noformat} > The job is configured with checkpoints at 1 minute intervals. The Kafka > connector consumer is configured to start from group offsets if it is not > started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka > consumer is configured to fallback to the earliest offsets is no group > offsets are committed by setting `auto.offset.reset` to `earliest` in the > Kafka consumer config. > Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership > of its partitions for around 30 seconds as a result of losing its connection > to ZooKeeper. > > {noformat} > [2018-06-30 09:34:54,799] INFO Unable to read additional data from server > sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for > partition [cloud_ioc_events,32] to broker > 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition. > (kafka.server.ReplicaFetcherThread) > {noformat} > The broker immediately reconnected to after a few tries ZK: > {noformat} > [2018-06-30 09:34:55,462] INFO Opening socket connection to server > 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,463] INFO Socket connection established to > 10.210.48.187/10.210.48.187:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,465] INFO Initiating client connection, > connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 > (org.apache.zookeeper.ZooKeeper) > [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,466] INFO EventThread shut down for session: > 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,468] INFO Opening socket connection to server > 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,468] INFO Socket connection established to > 10.210.43.200/10.210.43.200:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO Session establishment complete on server > 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated > timeout = 6000 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker > 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) > [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? > false) (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK > (kafka.utils.ZKCheckedEphemeral) > [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path > /brokers/ids/2005 with addresses: > EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINT
[jira] [Commented] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532040#comment-16532040 ] Elias Levy commented on FLINK-9731: --- Closing as I suspect the error is on the Kafka side. Logs indicate Kafka truncated the partition in the process of the broker catching up with a replica and regaining leadership of the partition. But that would imply that somehow Kafka allowed Flink to read an uncommitted message, as we are publishing with acks=all, and the topic has min.insync.replicas=2, which breaks the consistency guarantees of Kafka. The truncation lead to the Flink fetch being considered out-of-range, causing the auto.offset.reset logic in the Kafka consumer kicking in, leading to consumption from the earliest offset. > Kafka source subtask begins to consume from earliest offset > --- > > Key: FLINK-9731 > URL: https://issues.apache.org/jira/browse/FLINK-9731 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Critical > > On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink > job instance began consuming records from the earliest offsets available in > Kafka for the partitions assigned to it. Other subtasks did not exhibit this > behavior and continued operating normally. > Previous to the event the job exhibited no Kafka lag. The job showed no > failed checkpoints and the job did not restore or restart. Flink logs only > shoed the following message: > {noformat} > June 30th 2018, 02:35:01.711 Fetch offset 2340400514 is out of range for > partition topic-124, resetting offset > {noformat} > The job is configured with checkpoints at 1 minute intervals. The Kafka > connector consumer is configured to start from group offsets if it is not > started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka > consumer is configured to fallback to the earliest offsets is no group > offsets are committed by setting `auto.offset.reset` to `earliest` in the > Kafka consumer config. > Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership > of its partitions for around 30 seconds as a result of losing its connection > to ZooKeeper. > > {noformat} > [2018-06-30 09:34:54,799] INFO Unable to read additional data from server > sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for > partition [cloud_ioc_events,32] to broker > 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This > server is not the leader for that topic-partition. > (kafka.server.ReplicaFetcherThread) > {noformat} > The broker immediately reconnected to after a few tries ZK: > {noformat} > [2018-06-30 09:34:55,462] INFO Opening socket connection to server > 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,463] INFO Socket connection established to > 10.210.48.187/10.210.48.187:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,465] INFO Initiating client connection, > connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka > sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 > (org.apache.zookeeper.ZooKeeper) > [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, > session 0x161305b7bd81a09 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,466] INFO EventThread shut down for session: > 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) > (org.I0Itec.zkclient.ZkClient) > [2018-06-30 09:34:55,468] INFO Opening socket connection to server > 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,468] INFO Socket connection established to > 10.210.43.200/10.210.43.200:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2018-06-30 09:34:55,471] INFO Session establishment complete on server > 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated > timeout = 6000 (org.apache.zookeeper.C
[jira] [Updated] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
[ https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-9731: -- Description: On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally. Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs only shoed the following message: {noformat} June 30th 2018, 02:35:01.711Fetch offset 2340400514 is out of range for partition topic-124, resetting offset {noformat} The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config. Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper. {noformat} [2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32] to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} The broker immediately reconnected to after a few tries ZK: {noformat} [2018-06-30 09:34:55,462] INFO Opening socket connection to server 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,463] INFO Socket connection established to 10.210.48.187/10.210.48.187:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper) [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,468] INFO Opening socket connection to server 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,468] INFO Socket connection established to 10.210.43.200/10.210.43.200:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO Session establishment complete on server 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses: EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) {noformat} By 9:35:02 partitions had returned to the broker. It appears this it the broker that the subtask was consuming from, as outgoing network traffic from it spiked after the broker recovered leadership of its
[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset
Elias Levy created FLINK-9731: - Summary: Kafka source subtask begins to consume from earliest offset Key: FLINK-9731 URL: https://issues.apache.org/jira/browse/FLINK-9731 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.4.2 Reporter: Elias Levy On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job instance began consuming records from the earliest offsets available in Kafka for the partitions assigned to it. Other subtasks did not exhibit this behavior and continued operating normally. Previous to the event the job exhibited no Kafka lag. The job showed no failed checkpoints and the job did not restore or restart. Flink logs show no indication of anything amiss. There were no errors in the or Kafka related messages in the Flink logs. The job is configured with checkpoints at 1 minute intervals. The Kafka connector consumer is configured to start from group offsets if it is not started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka consumer is configured to fallback to the earliest offsets is no group offsets are committed by setting `auto.offset.reset` to `earliest` in the Kafka consumer config. Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of its partitions for around 30 seconds as a result of losing its connection to ZooKeeper. {noformat} [2018-06-30 09:34:54,799] INFO Unable to read additional data from server sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for partition [cloud_ioc_events,32] to broker 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread) {noformat} The broker immediately reconnected to after a few tries ZK: {noformat} [2018-06-30 09:34:55,462] INFO Opening socket connection to server 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,463] INFO Socket connection established to 10.210.48.187/10.210.48.187:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,465] INFO Initiating client connection, connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 (org.apache.zookeeper.ZooKeeper) [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, session 0x161305b7bd81a09 has expired, closing socket connection (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,468] INFO Opening socket connection to server 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,468] INFO Socket connection established to 10.210.43.200/10.210.43.200:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO Session establishment complete on server 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 with addresses: EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT) (kafka.utils.ZkUtils) [2018-06-30 09:34:55,476] INFO done re-registering broker (kafka.server.KafkaHealthcheck$SessionExpireListener) [2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck$SessionExpireListener) {noformat} By 9:35:02 partitions had returned to the broker. It appears this
[jira] [Commented] (FLINK-9662) Task manager isolation for jobs
[ https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16525331#comment-16525331 ] Elias Levy commented on FLINK-9662: --- I like the direction of this. And as I mentioned in the other issue, for the "any" case, the system could priority slots with the most matches, or a third type could be introduce: "all", "any" and "most". > Task manager isolation for jobs > --- > > Key: FLINK-9662 > URL: https://issues.apache.org/jira/browse/FLINK-9662 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0, 1.6.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.6.0 > > > Disable task manager sharing for different jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9682) Add setDescription to execution environment and display it in the UI
Elias Levy created FLINK-9682: - Summary: Add setDescription to execution environment and display it in the UI Key: FLINK-9682 URL: https://issues.apache.org/jira/browse/FLINK-9682 Project: Flink Issue Type: Improvement Components: DataStream API, Webfrontend Affects Versions: 1.5.0 Reporter: Elias Levy Currently you can provide a job name to {{execute}} in the execution environment. In an environment where many version of a job may be executing, such as a development or test environment, identifying which running job is of a specific version via the UI can be difficult unless the version is embedded into the job name given the {{execute}}. But the job name is uses for other purposes, such as for namespacing metrics. Thus, it is not ideal to modify the job name, as that could require modifying metric dashboards and monitors each time versions change. I propose a new method be added to the execution environment, {{setDescription}}, that would allow a user to pass in an arbitrary description that would be displayed in the dashboard, allowing users to distinguish jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9662) Task manager isolation for jobs
[ https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16524278#comment-16524278 ] Elias Levy commented on FLINK-9662: --- I took a look at the design doc, but I am not sufficiently familiar with the recent resource manager changes to comment on the details. > Task manager isolation for jobs > --- > > Key: FLINK-9662 > URL: https://issues.apache.org/jira/browse/FLINK-9662 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination >Affects Versions: 1.5.0 >Reporter: Renjie Liu >Assignee: Renjie Liu >Priority: Major > Fix For: 1.5.1 > > > Disable task manager sharing for different jobs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function
Elias Levy created FLINK-9600: - Summary: Add DataStream transformation variants that pass timestamp to the user function Key: FLINK-9600 URL: https://issues.apache.org/jira/browse/FLINK-9600 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 1.5.0 Reporter: Elias Levy It is often necessary to access the timestamp assigned to records within user functions. At the moment this is only possible from {{RichFunction}}. Implementing a {{RichFunction}} just to access the timestamp is burdensome, so most job carry a duplicate of the timestamp within the record. It would be useful if {{DataStream}} provided transformation methods that accepted user functions that could be passed the record's timestamp as an additional argument, similar to how there are two variants of {{flatMap}}, one with an extra parameter that gives the user function access to the output {{Collector}}. Along similar lines, it may be useful to have variants that pass the record's key as an additional parameter. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8886) Job isolation via scheduling in shared cluster
[ https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16503536#comment-16503536 ] Elias Levy commented on FLINK-8886: --- I am ok with having a separate issue for per job JVM isolation, but I am primarily interested on the per job TM isolation via scheduling to matching TMs. In practice that will give us JVM isolation without having to wait for anything else. > Job isolation via scheduling in shared cluster > -- > > Key: FLINK-8886 > URL: https://issues.apache.org/jira/browse/FLINK-8886 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Local Runtime, Scheduler >Affects Versions: 1.5.0 >Reporter: Elias Levy >Assignee: Renjie Liu >Priority: Major > > Flink's TaskManager executes tasks from different jobs within the same JVM as > threads. We prefer to isolate different jobs on their own JVM. Thus, we > must use different TMs for different jobs. As currently the scheduler will > allocate task slots within a TM to tasks from different jobs, that means we > must stand up one cluster per job. This is wasteful, as it requires at least > two JobManagers per cluster for high-availability, and the JMs have low > utilization. > Additionally, different jobs may require different resources. Some jobs are > compute heavy. Some are IO heavy (lots of state in RocksDB). At the moment > the scheduler threats all TMs are equivalent, except possibly in their number > of available task slots. Thus, one is required to stand up multiple cluster > if there is a need for different types of TMs. > It would be useful if one could specify requirements on job, such that they > are only scheduled on a subset of TMs. Properly configured, that would > permit isolation of jobs in a shared cluster and scheduling of jobs with > specific resource needs. > One possible implementation is to specify a set of tags on the TM config file > which the TMs used when registering with the JM, and another set of tags > configured within the job or supplied when submitting the job. The scheduler > could then match the tags in the job with the tags in the TMs. In a > restrictive mode the scheduler would assign a job task to a TM only if all > tags match. In a relaxed mode the scheduler could assign a job task to a TM > if there is a partial match, while giving preference to a more accurate match. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes
Elias Levy created FLINK-9495: - Summary: Implement ResourceManager for Kubernetes Key: FLINK-9495 URL: https://issues.apache.org/jira/browse/FLINK-9495 Project: Flink Issue Type: Improvement Components: ResourceManager Affects Versions: 1.5.0 Reporter: Elias Levy I noticed there is no issue for developing a Kubernetes specific ResourceManager under FLIP-6, so I am creating this issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9450) Job hangs if S3 access it denied during checkpoints
[ https://issues.apache.org/jira/browse/FLINK-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16492837#comment-16492837 ] Elias Levy commented on FLINK-9450: --- The logs show no errors. Alas, we are using the JRE, so these nodes don't have jps or jstack installed. Re: job failure on checkpoint failure. My understanding is that is a 1.5 feature (env.getCheckpointConfig.setFailTasksOnCheckpointingErrors). We are running 1.4.2. The checkpoint and the job just hang. There is no failure, at least for the amount of time I've waited, which has been several minutes. I don't know if it will fail if I wait longer. The job continues after I clear the firewall rules. Looks like the checkpoint may be hanging on the synchronous portion of the async checkpoint. This is from a Flink job running on a cluster with a single TM: I run: {noformat} iptables -A OUTPUT -p tcp --dport 80 -j REJECT --reject-with tcp-reset iptables -A OUTPUT -p tcp --dport 443 -j REJECT --reject-with tcp-reset {noformat} The logs show what seems like a normal checkpoint attempt: {noformat} 2018-05-28 16:44:06.449070500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:44:06.449103500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:44:06.449388500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Filter (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:44:06.449390500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:44:06.449400500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:44:06.451695500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend - DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Engine (1/1),5,Flink Task Threads] took 2 ms {noformat} After I disable the firewall rules: {noformat} iptables -D OUTPUT -p tcp --dport 80 -j REJECT --reject-with tcp-reset iptables -D OUTPUT -p tcp --dport 443 -j REJECT --reject-with tcp-reset {noformat} {noformat} 2018-05-28 16:47:19.741581500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend- DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, asynchronous part) in thread Thread[pool-58-thread-1,5,Flink Task Threads] took 193290 ms. 2018-05-28 16:47:23.424434500 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:47:23.424876500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend- DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent Queries Filter (1/1),5,Flink Task Threads] took 0 ms. 2018-05-28 16:47:23.426263500 INFO org.apache.flink.runtime.state.DefaultOperatorStateBackend- DefaultOperatorStateBackend snapshot (File Stream Factory @ s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent Queries Fi
[jira] [Created] (FLINK-9450) Job hangs if S3 access it denied during checkpoints
Elias Levy created FLINK-9450: - Summary: Job hangs if S3 access it denied during checkpoints Key: FLINK-9450 URL: https://issues.apache.org/jira/browse/FLINK-9450 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.4.2 Reporter: Elias Levy We have a streaming job that consumes from and writes to Kafka. The job is configured to checkpoint to S3. If we deny access to S3 by using iptables on the TM host to deny all outgoing connections to ports 80 and 443, whether using DROP or REJECT, and whether using REJECT with -reject-with tcp-reset or -r reject-with imp-port-unreachable, the job soon stops publishing to Kafka. This happens whether or not the Kafka sources have {{setCommitOffsetsOnCheckpoints}} set to true or false. The system is configured to use Presto for the S3 file system. The job has a small amount of state, so it is configured to use {{FsStateBackend}} with asynchronous snapshots. If the ip tables rules are removed, the job continues the function. I would expect the job to either fail or continue running if a checkpoint fails. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9440) Allow cancelation and reset of timers
Elias Levy created FLINK-9440: - Summary: Allow cancelation and reset of timers Key: FLINK-9440 URL: https://issues.apache.org/jira/browse/FLINK-9440 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 1.4.2 Reporter: Elias Levy Currently the {{TimerService}} allows one to register timers, but it is not possible to delete a timer or to reset a timer to a new value. If one wishes to reset a timer, one must also handle the previous inserted timer callbacks and ignore them. I would be useful if the API allowed one to remove and reset timers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9403) Documentation continues to refer to removed methods
[ https://issues.apache.org/jira/browse/FLINK-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-9403: -- Description: {{org.apache.flink.api.common.ExecutionConfig}} no longer has the {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} methods. They were removed in [this commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e]. There are referenced in the Execution Parameters section of the Flink DataStream API Programming Guide page. was:{{org.apache.flink.api.common.ExecutionConfig}} no longer has the {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} methods. They were removed in [this commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e]. > Documentation continues to refer to removed methods > --- > > Key: FLINK-9403 > URL: https://issues.apache.org/jira/browse/FLINK-9403 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.6.0 >Reporter: Elias Levy >Priority: Minor > > {{org.apache.flink.api.common.ExecutionConfig}} no longer has the > {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} > methods. They were removed in [this > commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e]. > There are referenced in the Execution Parameters section of the Flink > DataStream API Programming Guide page. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9403) Documentation continues to refer to removed methods
Elias Levy created FLINK-9403: - Summary: Documentation continues to refer to removed methods Key: FLINK-9403 URL: https://issues.apache.org/jira/browse/FLINK-9403 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.6.0 Reporter: Elias Levy {{org.apache.flink.api.common.ExecutionConfig}} no longer has the {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} methods. They were removed in [this commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9272) DataDog API "counter" metric type is deprecated
[ https://issues.apache.org/jira/browse/FLINK-9272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457827#comment-16457827 ] Elias Levy commented on FLINK-9272: --- I tried to find out before opening the issue, but I found no information, other than a notice in the docs saying {{counter}} was deprecated and to use {{count}}, and to notice that the API docs no longer list {{counter}}. > DataDog API "counter" metric type is deprecated > > > Key: FLINK-9272 > URL: https://issues.apache.org/jira/browse/FLINK-9272 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.2 >Reporter: Elias Levy >Priority: Major > > It appears to have been replaced by the "count" metric type. > https://docs.datadoghq.com/developers/metrics/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-9272) DataDog API "counter" metric type is deprecated
Elias Levy created FLINK-9272: - Summary: DataDog API "counter" metric type is deprecated Key: FLINK-9272 URL: https://issues.apache.org/jira/browse/FLINK-9272 Project: Flink Issue Type: Improvement Components: Metrics Affects Versions: 1.4.2 Reporter: Elias Levy It appears to have been replaced by the "count" metric type. https://docs.datadoghq.com/developers/metrics/ -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite
[ https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16428676#comment-16428676 ] Elias Levy commented on FLINK-6756: --- Just ran into this, any progress? > Provide RichAsyncFunction to Scala API suite > > > Key: FLINK-6756 > URL: https://issues.apache.org/jira/browse/FLINK-6756 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Andrea Spina >Assignee: Andrea Spina >Priority: Major > > I can't find any tracking info about the chance to have RichAsyncFunction in > the Scala API suite. I think it'd be nice to have this function in order to > access open/close methods and the RuntimeContext. > I was able to retrieve > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593 > only, so my question is if there are some blocking issues avoiding this > feature. [~till.rohrmann] > If it's possible and nobody already have done it, I can assign the issue to > myself in order to implement it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8886) Job isolation via scheduling in shared cluster
[ https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405753#comment-16405753 ] Elias Levy commented on FLINK-8886: --- Thanks. I am aware of YARN, but we do want to manage YARN just to run Flink. I we wanted to go down the route of using a generic resource manager, we'd use Mesos, but we don't have a need to do. As I alluded to, we already run multiple Flink clusters in standalone mode to gain isolation, but this is usually wasteful, as the JM is usually lightly loaded and you need at least two of them for high-availability. So it would be useful to share JMs while jobs are isolated to TMs, which is what I am proposing. As for the complexity of the proposal, I'd argue that is is relatively lightweight in the restrictive mode. Just permit the TM to register a set of tags placed in the config file, then have the scheduler only schedule jobs on TMs that have an exact match for the tags. > Job isolation via scheduling in shared cluster > -- > > Key: FLINK-8886 > URL: https://issues.apache.org/jira/browse/FLINK-8886 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination, Local Runtime, Scheduler >Affects Versions: 1.5.0 >Reporter: Elias Levy >Priority: Major > > Flink's TaskManager executes tasks from different jobs within the same JMV as > threads. We prefer to isolate different jobs on their on JVM. Thus, we must > use different TMs for different jobs. As currently the scheduler will > allocate task slots within a TM to tasks from different jobs, that means we > must stand up one cluster per job. This is wasteful, as it requires at least > two JobManagers per cluster for high-availability, and the JMs have low > utilization. > Additionally, different jobs may require different resources. Some jobs are > compute heavy. Some are IO heavy (lots of state in RocksDB). At the moment > the scheduler threats all TMs are equivalent, except possibly in their number > of available task slots. Thus, one is required to stand up multiple cluster > if there is a need for different types of TMs. > > It would be useful if one could specify requirements on job, such that they > are only scheduled on a subset of TMs. Properly configured, that would > permit isolation of jobs in a shared cluster and scheduling of jobs with > specific resource needs. > > One possible implementation is to specify a set of tags on the TM config file > which the TMs used when registering with the JM, and another set of tags > configured within the job or supplied when submitting the job. The scheduler > could then match the tags in the job with the tags in the TMs. In a > restrictive mode the scheduler would assign a job task to a TM only if all > tags match. In a relaxed mode the scheduler could assign a job task to a TM > if there is a partial match, while giving preference to a more accurate match. > > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster
Elias Levy created FLINK-8886: - Summary: Job isolation via scheduling in shared cluster Key: FLINK-8886 URL: https://issues.apache.org/jira/browse/FLINK-8886 Project: Flink Issue Type: Improvement Components: Scheduler Affects Versions: 1.5.0 Reporter: Elias Levy Flink's TaskManager executes tasks from different jobs within the same JMV as threads. We prefer to isolate different jobs on their on JVM. Thus, we must use different TMs for different jobs. As currently the scheduler will allocate task slots within a TM to tasks from different jobs, that means we must stand up one cluster per job. This is wasteful, as it requires at least two JobManagers per cluster for high-availability, and the JMs have low utilization. Additionally, different jobs may require different resources. Some jobs are compute heavy. Some are IO heavy (lots of state in RocksDB). At the moment the scheduler threats all TMs are equivalent, except possibly in their number of available task slots. Thus, one is required to stand up multiple cluster if there is a need for different types of TMs. It would be useful if one could specify requirements on job, such that they are only scheduled on a subset of TMs. Properly configured, that would permit isolation of jobs in a shared cluster and scheduling of jobs with specific resource needs. One possible implementation is to specify a set of tags on the TM config file which the TMs used when registering with the JM, and another set of tags configured within the job or supplied when submitting the job. The scheduler could then match the tags in the job with the tags in the TMs. In a restrictive mode the scheduler would assign a job task to a TM only if all tags match. In a relaxed mode the scheduler could assign a job task to a TM if there is a partial match, while giving preference to a more accurate match. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API
Elias Levy created FLINK-8844: - Summary: Export job jar file name or job version property via REST API Key: FLINK-8844 URL: https://issues.apache.org/jira/browse/FLINK-8844 Project: Flink Issue Type: Improvement Components: REST Affects Versions: 1.4.3 Reporter: Elias Levy To aid automated deployment of jobs, it would be useful if the REST API exposed either a running job's jar filename or a version property the job could set, similar to how it sets the job name. As it is now there is no standard mechanism to determine what version of a job is running in a cluster. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail
[ https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375179#comment-16375179 ] Elias Levy commented on FLINK-7641: --- I mean that if you have a standalone cluster in HA mode with multiple JMs, if the current master JM fails, any jobs executing in the cluster will be stopped and then restored by the new master JM. Ideally master JM failover should be largely invisible to running jobs. At most, they should be temporary paused and continued, rather than stopped and restarted. > Loss of JobManager in HA mode should not cause jobs to fail > --- > > Key: FLINK-7641 > URL: https://issues.apache.org/jira/browse/FLINK-7641 > Project: Flink > Issue Type: Improvement > Components: JobManager >Affects Versions: 1.3.2 >Reporter: Elias Levy >Assignee: vinoyang >Priority: Major > > Currently if a standalone cluster of JobManagers is configured in > high-availability mode and the master JM is lost, the job executing in the > cluster will be restarted. This is less than ideal. It would be best if the > jobs could continue to execute without restarting while one of the spare JMs > becomes the new master, or in the worse case, the jobs are paused while the > JM election takes place. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374525#comment-16374525 ] Elias Levy commented on FLINK-8751: --- Apologies. I meant TM. It was a long day. I've amended the issue. > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Elias Levy updated FLINK-8751: -- Description: Canceling a job results in the following exception reported by the TM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} was: Canceling a job results in the following exception reported by the JM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} Component/s: (was: JobManager) TaskManager Summary: Canceling a job results in a InterruptedException in the TM (was: Canceling a job results in a InterruptedException in the JM) > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8752) ClassNotFoundException when using the user code class loader
[ https://issues.apache.org/jira/browse/FLINK-8752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373954#comment-16373954 ] Elias Levy commented on FLINK-8752: --- Some other information: Cluster is configured in HA mode with S3 as storage for checkpoints and using the Presto S3 jar. I've confirmed that the artifact downloaded by the blob cache in the JM matches the job's jar and that is contains the class in question. Using Java 8 update 152. > ClassNotFoundException when using the user code class loader > > > Key: FLINK-8752 > URL: https://issues.apache.org/jira/browse/FLINK-8752 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Priority: Major > > Attempting to submit a job results in the job failing while it is being > started in the JMs with a ClassNotFoundException error: > {code:java} > java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542) > at > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89) > at > org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > {code} > If I drop the job's jar into the lib folder in the JM and configure the JM to > classloader.resolve-order to parent-first the job starts successfully. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8752) ClassNotFoundException when using the user code class loader
Elias Levy created FLINK-8752: - Summary: ClassNotFoundException when using the user code class loader Key: FLINK-8752 URL: https://issues.apache.org/jira/browse/FLINK-8752 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.1 Reporter: Elias Levy Attempting to submit a job results in the job failing while it is being started in the JMs with a ClassNotFoundException error: {code:java} java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Unknown Source) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) at java.io.ObjectInputStream.readClassDesc(Unknown Source) at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) at java.io.ObjectInputStream.readObject0(Unknown Source) at java.io.ObjectInputStream.readObject(Unknown Source) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source) {code} If I drop the job's jar into the lib folder in the JM and configure the JM to classloader.resolve-order to parent-first the job starts successfully. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8751) Canceling a job results in a InterruptedException in the JM
Elias Levy created FLINK-8751: - Summary: Canceling a job results in a InterruptedException in the JM Key: FLINK-8751 URL: https://issues.apache.org/jira/browse/FLINK-8751 Project: Flink Issue Type: Bug Components: JobManager Affects Versions: 1.4.1 Reporter: Elias Levy Canceling a job results in the following exception reported by the JM: {code:java} ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could not shut down timer service java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown Source) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Unknown Source){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16322789#comment-16322789 ] Elias Levy commented on FLINK-7935: --- So it seems the DD reporter needs to switch from {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} to {{MetricGroup#getMetricIdentifier(String)}}. That would be sufficient for my immediate use case, as I am only looking to add a single user supplied scope/tag. That said, I can see {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} becoming cumbersome if a user wishes to use multiple key-values/tags. E.g. {code} getRuntimeContext() .getMetricGroup() .addGroup("messages") .addGroup("type", messageType) .addGroup("source", messageSource) .addGroup("priority", messagePriority) .counter("count") {code} would be named {{.messages.type.source.priority.count}} instead of just {{.messages.count}} with variables/tags {{type}}, {{source}}, and {{priority}}. > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5627) Allow job specific externalized checkpoint dir
[ https://issues.apache.org/jira/browse/FLINK-5627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16321239#comment-16321239 ] Elias Levy commented on FLINK-5627: --- It would be nice to fix this issue. > Allow job specific externalized checkpoint dir > -- > > Key: FLINK-5627 > URL: https://issues.apache.org/jira/browse/FLINK-5627 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.2.0, 1.3.0 >Reporter: Gyula Fora > > Currently the externalized checkpoint directory can only be configured on a > cluster level. This is not in sync with the way checkpoint directories are > generally configured on a job specific level. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16314122#comment-16314122 ] Elias Levy commented on FLINK-7935: --- [~Zentol] thoughts? > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8358) Hostname used by DataDog metric reporter is not configurable
Elias Levy created FLINK-8358: - Summary: Hostname used by DataDog metric reporter is not configurable Key: FLINK-8358 URL: https://issues.apache.org/jira/browse/FLINK-8358 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.4.0 Reporter: Elias Levy The hostname used by the DataDog metric reporter to report metrics is not configurable. This can problematic if the hostname that Flink uses is different from the hostname used by the system's DataDog agent. For instance, in our environment we use Chef, and using the DataDog Chef Handler, certain metadata such a host roles is associated with the hostname in the DataDog service. The hostname used to submit this metadata is the name we have given the host. But as Flink picks up the default name given by EC2 to the instance, metrics submitted by Flink to DataDog using that hostname are not associated with the tags derived from Chef. In the Job Manager we can avoid this issue by explicitly setting the config {{jobmanager.rpc.address}} to the hostname we desire. I attempted to do the name on the Task Manager by setting the {{taskmanager.hostname}} config, but DataDog does not seem to pick up that value. Digging through the code it seem the DD metric reporter get the hostname from the {{TaskManagerMetricGroup}} host variable, which seems to be set from {{taskManagerLocation.getHostname}}. That in turn seems to be by calling {{this.inetAddress.getCanonicalHostName()}}, which merely perform a reverse lookup on the IP address, and then calling {{NetUtils.getHostnameFromFQDN}} on the result. The later is further problematic because it result is a non-fully qualified hostname. More generally, there seems to be a need to specify the hostname of a JM or TM node that be reused across Flink components. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16310189#comment-16310189 ] Elias Levy commented on FLINK-7935: --- It appears the Metric documentation has not been updated in the 1.5 snapshot, but if I understand the changes it means that for instance, if I have a job that processes a bounded but not predefined set of message types, and publish a metric per message type that counts the number of process messages per type, I could do: {code} getRuntimeContext() .getMetricGroup() .addGroup("messages") .addGroup("type", messageType) .counter("count") {code} And the DataDog reporter would report a metric named {{messages.counts}} with tags of {{type:messageType}}. If that is correct, then it may be sufficient. Does FLINK-7692 work as I described? > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures
Elias Levy created FLINK-8352: - Summary: Flink UI Reports No Error on Job Submission Failures Key: FLINK-8352 URL: https://issues.apache.org/jira/browse/FLINK-8352 Project: Flink Issue Type: Bug Components: Web Client Affects Versions: 1.4.0 Reporter: Elias Levy If you submit a job jar via the web UI and it raises an exception when started, the UI will report no error and will continue the show the animated image that makes it seem as if it is working. In addition, no error is printed in the logs, unless the level is increased to at least DEBUG: {noformat} @40005a4c399202b87ebc DEBUG org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while handling request. @40005a4c399202b8868c java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b88a74 at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68) @40005a4c399202b88e5c at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) @40005a4c399202b8e44c at java.util.concurrent.FutureTask.run(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source) @40005a4c399202b8e834 at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) @40005a4c399202b8f3ec at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) @40005a4c399202b8f7d4 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) @40005a4c399202b8f7d4 at java.lang.Thread.run(Unknown Source) @40005a4c399202b8fbbc Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error: @40005a4c399202b90b5c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93) @40005a4c399202b90f44 at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334) @40005a4c399202b90f44 at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76) @40005a4c399202b91afc at org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57) @40005a4c399202b91afc ... 8 more @40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError @40005a4c399202b91ee4 at com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala) @40005a4c399202b922cc at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) @40005a4c399202b92a9c at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92a9c at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) @40005a4c399202b92e84 at java.lang.reflect.Method.invoke(Unknown Source) @40005a4c399202b92e84 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525) @40005a4c399202b9326c at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417) @40005a4c399202b93a3c at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) @40005a4c399202b949dc ... 11 more @40005a4c399202b949dc Caused by: java.io.FileNotFoundException: /data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55 (No such file or directory) @40005a4c399202b951ac at java.io.FileOutputStream.open0(Native Method) @40005a4c399202b951ac at java.io.FileOutputStream.open(Unknown Source) @40005a4c399202b9597c at java.io.FileOutputStream.(Unknown Source) @40005a4c399202b9597c at java.io.FileWriter.(Unknown Source) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b95d64 at scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42) @40005a4c399202b9614c at scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901) @40005a4c399202b9614c at scoverage.Invoker$.invoked(Invoker.scala:42) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala:28) @40005a4c399202b9691c at com.XXX$.(IocEngine.scala) {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8311) Flink needs documentation for network access control
Elias Levy created FLINK-8311: - Summary: Flink needs documentation for network access control Key: FLINK-8311 URL: https://issues.apache.org/jira/browse/FLINK-8311 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.4.0 Reporter: Elias Levy There is a need for better documentation on what connects to what over which ports in a Flink cluster to allow users to configure network access control rules. E.g. I was under the impression that in a ZK HA configuration the Job Managers were essentially independent and only coordinated via ZK. But starting multiple JMs in HA with the JM RPC port blocked between JMs shows that the second JM's Akka subsystem is trying to connect to the leading JM: INFO akka.remote.transport.ProtocolStateActor - No response from remote for outbound association. Associate timed out after [2 ms]. WARN akka.remote.ReliableDeliverySupervisor- Association with remote system [akka.tcp://flink@10.210.210.127:6123] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for outbound association. Associate timed out after [2 ms].] WARN akka.remote.transport.netty.NettyTransport- Remote connection to [null] failed with org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: connection timed out: /10.210.210.127:6123 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5880) Add documentation for object reuse for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297798#comment-16297798 ] Elias Levy commented on FLINK-5880: --- Came across here to open this issue after reading the latest [blog post|https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime]. Hard to fault Databricks if the documentation about object reuse is not there. > Add documentation for object reuse for DataStream API > - > > Key: FLINK-5880 > URL: https://issues.apache.org/jira/browse/FLINK-5880 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Aljoscha Krettek > > The batch documentation has this section: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16236751#comment-16236751 ] Elias Levy commented on FLINK-6243: --- [~StephanEwen] anything to review? > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy >Priority: Major > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16222972#comment-16222972 ] Elias Levy commented on FLINK-7935: --- Possibly. It depends on whether you could add multiple metrics or metric groups that differ in their variables, but that could be formatted the same. E.g. the TaskManagerJobMetricGroup creates and tracks distinct TaskMetricGroup for each task in a the portion of a job the task manager is executing. The metrics for each task are tracked separately, but I can format the scope so all of them are reported with the same name ("taskmanager.job.task") but with different variables/DD tags. > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables
Elias Levy created FLINK-7935: - Summary: Metrics with user supplied scope variables Key: FLINK-7935 URL: https://issues.apache.org/jira/browse/FLINK-7935 Project: Flink Issue Type: Improvement Affects Versions: 1.3.2 Reporter: Elias Levy We use DataDog for metrics. DD and Flink differ somewhat in how they track metrics. Flink names and scopes metrics together, at least by default. E.g. by default the System scope for operator metrics is {{.taskmanager}}. The scope variables become part of the metric's full name. In DD the metric would be named something generic, e.g. {{taskmanager.job.operator}}, and they would be distinguished by their tag values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. Flink allows you to configure the format string for system scopes, so it is possible to set the operator scope format to {{taskmanager.job.operator}}. We do this for all scopes: {code} metrics.scope.jm: jobmanager metrics.scope.jm.job: jobmanager.job metrics.scope.tm: taskmanager metrics.scope.tm.job: taskmanager.job metrics.scope.task: taskmanager.job.task metrics.scope.operator: taskmanager.job.operator {code} This seems to work. The DataDog Flink metric's plugin submits all scope variables as tags, even if they are not used within the scope format. And it appears internally this does not lead to metrics conflicting with each other. We would like to extend this to user defined metrics, but you can define variables/scopes when adding a metric group or metric with the user API, so that in DD we have a single metric with a tag with many different values, rather than hundreds of metrics to just the one value we want to measure across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)