[jira] [Commented] (FLINK-14111) Flink should be robust to a non-leader Zookeeper host going down

2019-09-17 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14111?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931901#comment-16931901
 ] 

Elias Levy commented on FLINK-14111:


Probable related to FLINK-10052. 

> Flink should be robust to a non-leader Zookeeper host going down
> 
>
> Key: FLINK-14111
> URL: https://issues.apache.org/jira/browse/FLINK-14111
> Project: Flink
>  Issue Type: Wish
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0, 1.8.1, 1.9.0
> Environment: Linux
> JVM 8
> Flink {{1.7.2}}, {{1.8.1}}, {{1.9.0}}
> {{Zookeeper version 3.4.5}}
>Reporter: Aaron Levin
>Priority: Major
>
> I noticed that if a non-leader Zookeeper node goes down and there is still 
> quorom in the zookeeper cluster , my flink application will restart anyway. I 
> believe it should be possible for Flink applications not to require a restart 
> in this scenario.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-09-17 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16931611#comment-16931611
 ] 

Elias Levy commented on FLINK-12122:


Till, that is a welcomed development.  I am surprised this issue has languished 
since the 1.5 days.  It makes it very difficult to run certain jobs in 
standalone clusters that are over-allocated to handle failover in case of TM 
node failure.  The uneven allocation of tasks results in Kafka consumer lag for 
a subset of partitions under many workloads.  We've that to modify our clusters 
to exactly match parallelism and number of slots, and use other mechanisms to 
handle failover when upgrading old jobs to 1.9.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-09-16 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930672#comment-16930672
 ] 

Elias Levy commented on FLINK-12122:


What is the status of this issue?  It's has a serious negative effect on 
production clusters now that we upgraded some old jobs to 1.9.0.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Priority: Major
> Attachments: image-2019-05-21-12-28-29-538.png, 
> image-2019-05-21-13-02-50-251.png
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic

2019-09-16 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930650#comment-16930650
 ] 

Elias Levy commented on FLINK-14002:


I am for that.  Alternatively, {{KafkaSerializationSchema}} and 
{{KeyedSerializationSchema}} could be merged, such that {{serialize}} could 
have a default implementation that calls {{getTargetTopic}}, {{serializeKey}}, 
and {{serializeValue}}, but can be overridden for more advanced applications.

> FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt 
> take default topic
> --
>
> Key: FLINK-14002
> URL: https://issues.apache.org/jira/browse/FLINK-14002
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Gyula Fora
>Priority: Major
>
> When the KafkaSerializationSchema is used the user has the to provide the 
> topic always when they create the ProducerRecord.
> The defaultTopic specified in the constructor (and enforced not to be null) 
> will always be ignored, this is very misleading.
> We should depracate these constructors and create new ones without 
> defaultTopic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-5312) Implement Standalone Setup v2.0

2019-09-13 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929598#comment-16929598
 ] 

Elias Levy commented on FLINK-5312:
---

Curious about the reasoning behind the Won't Do resolution.  Is the expectation 
that anyone that requires such functionality fallback on Kubernetes or some 
other resource manager?

> Implement Standalone Setup v2.0
> ---
>
> Key: FLINK-5312
> URL: https://issues.apache.org/jira/browse/FLINK-5312
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / Coordination
>Reporter: Manu Zhang
>Priority: Major
>
> Copied from 
> [FLIP-6|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077]
> A future version of the Standalone Setup could be thought of to implement 
> something like a “lightweight Yarn” architecture:
> * All nodes run a simple “NodeManager” process that spawns processes for the 
> TaskManagers and JobManagers, that way offering proper isolation of jobs 
> against each other.
> * The LocalDispatcher will not spawn the JobManager internally but on a 
> lightweight “node manager”



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14002) FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt take default topic

2019-09-13 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16929556#comment-16929556
 ] 

Elias Levy commented on FLINK-14002:


Came here to open just tis ticket. {{KeyedSerializationSchema}} was deprecated 
but {{KafkaSerializationSchema}} was not very well thought out.  A 
{{KafkaSerializationSchema}} that implements {{KafkaContextAware}} can return 
null from {{getTargetTopic}}, and that will result in the sink using the 
default topic to look up the partition information, but the 
{{KeyedSerializationSchema}} still needs to fill in the topic in the 
{{ProducerRecord}} or it will result in an exception.

Having a serializer that is lower-level and can create a `ProducerRecord` is 
nice, but IMHO we should have not have deprecated the simpler higher-level 
serializer.

 

> FlinkKafkaProducer constructor that takes KafkaSerializationSchema shouldnt 
> take default topic
> --
>
> Key: FLINK-14002
> URL: https://issues.apache.org/jira/browse/FLINK-14002
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka
>Reporter: Gyula Fora
>Priority: Major
>
> When the KafkaSerializationSchema is used the user has the to provide the 
> topic always when they create the ProducerRecord.
> The defaultTopic specified in the constructor (and enforced not to be null) 
> will always be ignored, this is very misleading.
> We should depracate these constructors and create new ones without 
> defaultTopic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-14057) Add Remove Other Timers to TimerService

2019-09-12 Thread Elias Levy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16928676#comment-16928676
 ] 

Elias Levy commented on FLINK-14057:


Could also add {{replaceProcessingTimeTimer}} and {{replaceEventTimeTimer}} .

> Add Remove Other Timers to TimerService
> ---
>
> Key: FLINK-14057
> URL: https://issues.apache.org/jira/browse/FLINK-14057
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jesse Anderson
>Priority: Major
>
> The TimerService service has the ability to add timers with 
> registerProcessingTimeTimer. This method can be called many times and have 
> different timer times.
> If you want to add a new timer and delete other timers, you have to keep 
> track of all previous timer times and call deleteProcessingTimeTimer for each 
> time. This method forces you to keep track of all previous (unexpired) timers 
> for a key.
> Instead, I suggest overloading registerProcessingTimeTimer with a second 
> boolean argument that will remove all previous timers and set the new timer.
> Note: although I'm using registerProcessingTimeTimer, this applies to 
> registerEventTimeTimer as well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2019-07-22 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16890276#comment-16890276
 ] 

Elias Levy commented on FLINK-10052:


The whole point of ZK is that it doesn't suffer from split brain.  It is a 
strongly consistent system.

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0, 1.8.1
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2019-07-10 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882252#comment-16882252
 ] 

Elias Levy commented on FLINK-10052:


Dominik, any progress?

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-13189) Fix the impact of zookeeper network disconnect temporarily on flink long running jobs

2019-07-10 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-13189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16882251#comment-16882251
 ] 

Elias Levy commented on FLINK-13189:


This is a duplicate of FLINK-10052.

> Fix the impact of zookeeper network disconnect temporarily on flink long 
> running jobs
> -
>
> Key: FLINK-13189
> URL: https://issues.apache.org/jira/browse/FLINK-13189
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.8.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Issue detail info*
> We deploy flink streaming jobs on hadoop cluster on per-job model and use 
> zookeeper as HighAvailabilityService, but we found that flink job will 
> restart because of the network was disconnected temporarily between 
> jobmanager and zookeeper.
> So we analyze this problem deeply. Flink JobManager use curator's 
> `+LeaderLatch+` to maintain the leadership. When network disconncet, the 
> `+LeaderLatch+` will change leadership to false directly. We think it's too 
> brutally that many flink longrunning jobs will restart because of the network 
> shake.
>  
> *Fix this issue*
> From curator official website, we found that this issuse was fixed at 
> curator-3.x.x, but we can't not just change the flink-curator-version(2.12.0) 
> to 3.x.x because of zk-compatibility. Curator-2.x.x support zookeeper-3.4.x 
> and zookeeper-3.5.0, curator-3.x.x just compatible with ZooKeeper 3.5.x. 
> Based on the above considerations, we update `LeaderLatch` at 
> flink-shaded-curator module.
>  
> *Other*
> Any suggestions are webcome, thanks
>  
> *Useful links*
> [https://curator.apache.org/zk-compatibility.html] 
>  [https://cwiki.apache.org/confluence/display/CURATOR/Releases] 
>  [http://curator.apache.org/curator-recipes/leader-latch.html]
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-12104) Flink Kafka fails with Incompatible KafkaProducer version / NoSuchFieldException sequenceNumbers

2019-04-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808986#comment-16808986
 ] 

Elias Levy commented on FLINK-12104:


The {{flink-connector-kafka-0.11}} depends on the Kafka 0.11.0.2 client, which 
does have that field. The field was only removed in 1.0.0. Are you overriding 
the Kafka client dependency? If so, that is your problem.


 If you want to use a newer Kafka client, use the universal Kafka connection 
({{flink-connector-kafka_2.11}}), which tracks the latest version of the Kafka 
client.

> Flink Kafka fails with Incompatible KafkaProducer version / 
> NoSuchFieldException sequenceNumbers
> 
>
> Key: FLINK-12104
> URL: https://issues.apache.org/jira/browse/FLINK-12104
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.7.2
>Reporter: Tim
>Priority: Major
>
> FlinkKafkaProducer (in flink-connector-kafka-0.11) tries to access a field 
> named `sequenceNumbers` from the KafkaProducer's TransactionManager.  You can 
> find this line on the [master branch 
> here|[https://github.com/apache/flink/blob/d6be68670e661091d94a3c65a2704d52fc0e827c/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/FlinkKafkaProducer.java#L197].]
>  
> {code:java}
> Object transactionManager = getValue(kafkaProducer, "transactionManager");
> ...
> Object sequenceNumbers = getValue(transactionManager, "sequenceNumbers");
> {code}
>  
> However, the Kafka TransactionManager no longer has a "sequenceNumbers" 
> field.  This was changed back on 9/14/2017 (KAFKA-5494) in an effort to 
> support multiple inflight requests while still guaranteeing idempotence.  See 
> [commit diff 
> here|[https://github.com/apache/kafka/commit/5d2422258cb975a137a42a4e08f03573c49a387e#diff-f4ef1afd8792cd2a2e9069cd7ddea630].]
> Subsequently when Flink tries to "recoverAndCommit" (see 
> FlinkKafkaProducer011) it fails with a "NoSuchFieldException: 
> sequenceNumbers", followed by a "Incompatible KafkaProducer version".
> Given that the KafkaProducer used is so old (this change was made almost two 
> years ago) are there any plans of upgrading?   Or - are there some known 
> compatibility issues that prevent Flink/Kafka connector from doing so?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12024) Bump universal Kafka connector to Kafka dependency to 2.2.0

2019-03-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-12024:
--

 Summary: Bump universal Kafka connector to Kafka dependency to 
2.2.0
 Key: FLINK-12024
 URL: https://issues.apache.org/jira/browse/FLINK-12024
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.7.2
Reporter: Elias Levy


Update the Kafka client dependency to version 2.2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11794) Allow compression of row format files created by StreamingFileSink

2019-03-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11794:
--

 Summary: Allow compression of row format files created by 
StreamingFileSink
 Key: FLINK-11794
 URL: https://issues.apache.org/jira/browse/FLINK-11794
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / FileSystem
Affects Versions: 1.7.2
Reporter: Elias Levy


Currently, there is no mechanism to compress files created using a 
StreamingFileSink.  This is highly desirable when output is a text based row 
format such as JSON.

Possible alternatives are the introduction of a callback that gets passed the 
local file before it is uploaded to the DFS, so that it could be compressed; or 
a factory method could be used that returns an OutputStream, such as 
GZIPOutputStream, that compresses a passed in output stream that could be then 
used by the Encoder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11520) Triggers should be provided the window state

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11520:
--

 Summary: Triggers should be provided the window state
 Key: FLINK-11520
 URL: https://issues.apache.org/jira/browse/FLINK-11520
 Project: Flink
  Issue Type: Improvement
Reporter: Elias Levy


Some triggers may require access to the window state to perform their job.  
Consider a window computing a count using an aggregate function.  It may be 
desired to fire the window when the count is 1 and then at the end of the 
window.  The early firing can provide feedback to external systems that a key 
has been observed, while waiting for the final count.

The same problem can be observed in 
org.apache.flink.streaming.api.windowing.triggers.CountTrigger, which must 
maintain an internal count instead of being able to make use of the window 
state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11517) Inefficient window state access when using RocksDB state backend

2019-02-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-11517:
--

 Summary: Inefficient window state access when using RocksDB state 
backend
 Key: FLINK-11517
 URL: https://issues.apache.org/jira/browse/FLINK-11517
 Project: Flink
  Issue Type: Bug
Reporter: Elias Levy


When using an aggregate function on a window with a process function and the 
RocksDB state backend, state access is inefficient.

The WindowOperator calls windowState.add to merge the new element using the 
aggregate function.  The add method of RocksDBAggregatingState will read the 
state, deserialize the state, call the aggregate function, deserialize the 
state, and write it out.

If the trigger decides the window must be fired, as the the windowState.add 
does not return the state, the WindowOperator must call windowState.get to get 
it and pass it to the window process function, resulting in another read and 
deserialization.

Finally, while the state is not passed in to the trigger, in some cases the 
trigger may have a need to access the state.  That is our case.  As the state 
is not passed to the trigger, we must read and deserialize the state one more 
from within the trigger.

Thus, state must be read and deserialized three times to process a single 
element.  If the state is large, this can be quite costly.

 

Ideally  windowState.add would return the state, so that the WindowOperator can 
pass it to the process function without having to read it again.  Additionally, 
the state would be made available to the trigger to enable more use cases 
without having to go through the state descriptor again.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11435) Different jobs same consumer group are treated as separate groups

2019-01-28 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754156#comment-16754156
 ] 

Elias Levy commented on FLINK-11435:


This is expected behavior.  Flink manages Kafka offsets itself.  Offsets are 
only reported to Kafka via the consumer group id to allow tracking progress by 
tools in Kafka ecosystem.

> Different jobs same consumer group are treated as separate groups
> -
>
> Key: FLINK-11435
> URL: https://issues.apache.org/jira/browse/FLINK-11435
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.7.1
> Environment: kafka consumer
>Reporter: Avi Levi
>Priority: Major
>
> deploying two jobs with the same consumer group id, still treated as 
> different consumer groups. This behavior does not comply with kafka 
> expectations. Same consumer group ids should be treated as the same group 
> this will enable deploying more jobs (specially if they are stateless ) on 
> demand and it also how normal consumer groups behave. 
> reproduce :
> deploy the same job twice - both jobs consumes the same message although they 
> share the same consumer id



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10460) DataDog reporter JsonMappingException

2019-01-26 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16753231#comment-16753231
 ] 

Elias Levy commented on FLINK-10460:


[~lining] as you can tell from the backtrace, that is not a user metric.  
Rather it appear to be a Kafka metric gathered in KafkaMetricWrapper

> DataDog reporter JsonMappingException
> -
>
> Key: FLINK-10460
> URL: https://issues.apache.org/jira/browse/FLINK-10460
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Minor
> Attachments: image-2019-01-24-16-00-56-280.png
>
>
> Observed the following error in the TM logs this morning:
> {code:java}
> WARN  org.apache.flink.metrics.datadog.DatadogHttpReporter  - Failed 
> reporting metrics to Datadog.
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
>  (was java.util.ConcurrentModificationException) (through reference chain: 
> org.apache.flink.metrics.datadog.DSeries["series"]->
> java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"])
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
>   at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
>at 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79)
>at 
> org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
>at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown
>  Source)
>at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
>at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
>at java.lang.Thread.run(Unknown Source)
>  Caused by: java.util.ConcurrentModificationException
>at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
>at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
>at java.util.AbstractCollection.addAll(Unknown Source)
>at java.util.HashSet.(Unknown Source)
>at 
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
>at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
>at 
> 

[jira] [Commented] (FLINK-11249) FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-01-23 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16749907#comment-16749907
 ] 

Elias Levy commented on FLINK-11249:


I believe that the open transactions are maintained.  The transaction data is 
recorded in the transaction log, which is an internal Kafka topic replicated 
three ways.  When a broker is restarted another broker's transaction 
coordinator becomes the leader for the transaction log partitions that were 
managed by the restarted broker.  The new transaction coordinator leader will 
read the transaction log partitions and rebuild the in memory transaction state 
and service the publishers.

> FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
> ---
>
> Key: FLINK-11249
> URL: https://issues.apache.org/jira/browse/FLINK-11249
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector
>Affects Versions: 1.7.0, 1.7.1
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As reported by a user on the mailing list "How to migrate Kafka Producer ?" 
> (on 18th December 2018), {{FlinkKafkaProducer011}} can not be migrated to 
> {{FlinkKafkaProducer}} and the same problem can occur in the future Kafka 
> producer versions/refactorings.
> The issue is that {{ListState 
> FlinkKafkaProducer#nextTransactionalIdHintState}} field is serialized using 
> java serializers and this is causing problems/collisions on 
> {{FlinkKafkaProducer011.NextTransactionalIdHint}}  vs
> {{FlinkKafkaProducer.NextTransactionalIdHint}}.
> To fix that we probably need to release new versions of those classes, that 
> will rewrite/upgrade this state field to a new one, that doesn't relay on 
> java serialization. After this, we could drop the support for the old field 
> and that in turn will allow users to upgrade from 0.11 connector to the 
> universal one.
> One bright side is that technically speaking our {{FlinkKafkaProducer011}} 
> has the same compatibility matrix as the universal one (it's also forward & 
> backward compatible with the same Kafka versions), so for the time being 
> users can stick to {{FlinkKafkaProducer011}}.
> FYI [~tzulitai] [~yanghua]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-05 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675634#comment-16675634
 ] 

Elias Levy commented on FLINK-10493:


[~tzulitai] correct.  It is not evident that the Scala serializers generated by 
the macros are anonymous classes.  One only finds out when a job upgrade fails 
and starts digging through the code to find the source of the error.  
Specifically, the section of the documentation that discusses [Type Information 
in the Scala 
API|https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#type-information-in-the-scala-api]
 fails to mention this issue.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at 

[jira] [Commented] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-11-05 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675404#comment-16675404
 ] 

Elias Levy commented on FLINK-10520:


[~huide] as I said " the system is configured with a default savepoint 
location".  As for {{cancel-job}}, the JSON schema documentation does not mark 
the key as mandatory.

> Job save points REST API fails unless parameters are specified
> --
>
> Key: FLINK-10520
> URL: https://issues.apache.org/jira/browse/FLINK-10520
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.1
>Reporter: Elias Levy
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
> unless the request includes a body with all parameters ({{target-directory}} 
> and {{cancel-job}})), even thought the 
> [documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
>  suggests these are optional.
> If a POST request with no data is made, the response is a 400 status code 
> with the error message "Bad request received."
> If the POST request submits an empty JSON object ( {} ), the response is a 
> 400 status code with the error message "Request did not match expected format 
> SavepointTriggerRequestBody."  The same is true if only the 
> {{target-directory}} or {{cancel-job}} parameters are included.
> As the system is configured with a default savepoint location, there 
> shouldn't be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-30 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668873#comment-16668873
 ] 

Elias Levy edited comment on FLINK-10617 at 10/30/18 3:28 PM:
--

I've retested with 1.6.2 and have confirmed the issue still exists, even though 
FLINK-9932 is fixed.

Back trace in the JM:


{noformat}
15:21:57,999 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph   
 - Job File Retros (dc0bc5ae8831ab9d0bb7b3535dfbf6c7) switched from state 
RUNNING to FAILING.
Could not allocate all requires slots within timeout of 30 ms. Slots 
required: 480, slots allocated: 223
org.apache.flink.runtime.executiongraph.ExecutionGraph.lambda$scheduleEager$3(ExecutionGraph.java:984)
java.util.concurrent.CompletableFuture.uniExceptionally(Unknown Source)
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(Unknown Source)
java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
org.apache.flink.runtime.concurrent.FutureUtils$ResultConjunctFuture.handleCompletedFuture(FutureUtils.java:535)
java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
akka.dispatch.OnComplete.internal(Future.scala:258)
akka.dispatch.OnComplete.internal(Future.scala:256)
akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{noformat}



was (Author: elevy):
I've retested with 1.6.2 and have confirmed the issue still exists, even though 
FLINK-9932 is fixed.

> Restoring job fails because of slot allocation timeout
> --
>
> Key: FLINK-10617
> URL: https://issues.apache.org/jira/browse/FLINK-10617
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, TaskManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: Elias Levy
>Priority: Critical
>
> The following may be related to FLINK-9932, but I am unsure.  If you believe 
> it is, go ahead and close this issue and a duplicate.
> While trying to test local state recovery on a job with large state, the job 
> failed to be restored because slot allocation timed out.
> The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
> per node).  The job has parallelism of 96, so it consumes all of the slots, 
> and has ~200 GB of state in RocksDB.  
> To test local state recovery I decided to kill one of the TMs.  The TM 
> immediately restarted and re-registered with the JM.  I confirmed the JM 
> showed 96 registered task slots.
> {noformat}
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Resolved ResourceManager address, beginning registration
> 21:35:44,616 INFO  

[jira] [Updated] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-30 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-10617:
---
Affects Version/s: 1.6.2

> Restoring job fails because of slot allocation timeout
> --
>
> Key: FLINK-10617
> URL: https://issues.apache.org/jira/browse/FLINK-10617
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, TaskManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: Elias Levy
>Priority: Major
>
> The following may be related to FLINK-9932, but I am unsure.  If you believe 
> it is, go ahead and close this issue and a duplicate.
> While trying to test local state recovery on a job with large state, the job 
> failed to be restored because slot allocation timed out.
> The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
> per node).  The job has parallelism of 96, so it consumes all of the slots, 
> and has ~200 GB of state in RocksDB.  
> To test local state recovery I decided to kill one of the TMs.  The TM 
> immediately restarted and re-registered with the JM.  I confirmed the JM 
> showed 96 registered task slots.
> {noformat}
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Resolved ResourceManager address, beginning registration
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Registration at ResourceManager attempt 1 (timeout=100ms)
> 21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Successful registration at resource manager 
> akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration 
> id 302988dea6afbd613bb2f96429b65d18.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,668 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Registration at JobManager attempt 1 (timeout=100ms)
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
> 21:36:49,688 INFO  

[jira] [Updated] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-30 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-10617:
---
Priority: Critical  (was: Major)

> Restoring job fails because of slot allocation timeout
> --
>
> Key: FLINK-10617
> URL: https://issues.apache.org/jira/browse/FLINK-10617
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, TaskManager
>Affects Versions: 1.6.1, 1.6.2
>Reporter: Elias Levy
>Priority: Critical
>
> The following may be related to FLINK-9932, but I am unsure.  If you believe 
> it is, go ahead and close this issue and a duplicate.
> While trying to test local state recovery on a job with large state, the job 
> failed to be restored because slot allocation timed out.
> The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
> per node).  The job has parallelism of 96, so it consumes all of the slots, 
> and has ~200 GB of state in RocksDB.  
> To test local state recovery I decided to kill one of the TMs.  The TM 
> immediately restarted and re-registered with the JM.  I confirmed the JM 
> showed 96 registered task slots.
> {noformat}
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Resolved ResourceManager address, beginning registration
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Registration at ResourceManager attempt 1 (timeout=100ms)
> 21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Successful registration at resource manager 
> akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration 
> id 302988dea6afbd613bb2f96429b65d18.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,668 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Registration at JobManager attempt 1 (timeout=100ms)
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
> 

[jira] [Commented] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-30 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16668873#comment-16668873
 ] 

Elias Levy commented on FLINK-10617:


I've retested with 1.6.2 and have confirmed the issue still exists, even though 
FLINK-9932 is fixed.

> Restoring job fails because of slot allocation timeout
> --
>
> Key: FLINK-10617
> URL: https://issues.apache.org/jira/browse/FLINK-10617
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, TaskManager
>Affects Versions: 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The following may be related to FLINK-9932, but I am unsure.  If you believe 
> it is, go ahead and close this issue and a duplicate.
> While trying to test local state recovery on a job with large state, the job 
> failed to be restored because slot allocation timed out.
> The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
> per node).  The job has parallelism of 96, so it consumes all of the slots, 
> and has ~200 GB of state in RocksDB.  
> To test local state recovery I decided to kill one of the TMs.  The TM 
> immediately restarted and re-registered with the JM.  I confirmed the JM 
> showed 96 registered task slots.
> {noformat}
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Resolved ResourceManager address, beginning registration
> 21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Registration at ResourceManager attempt 1 (timeout=100ms)
> 21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Successful registration at resource manager 
> akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration 
> id 302988dea6afbd613bb2f96429b65d18.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
> 21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,668 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
> 21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,681 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Starting ZooKeeperLeaderRetrievalService 
> /leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
> 21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Try to register at job manager 
> akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
> f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Resolved JobManager address, beginning registration
> 21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 
>- Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for 
> job 87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
> 8e06aa64d5f8961809da38fe7f224cc1.
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService 
>- Registration at JobManager attempt 1 (timeout=100ms)
> 21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor 

[jira] [Commented] (FLINK-9061) Add entropy to s3 path for better scalability

2018-10-29 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16667437#comment-16667437
 ] 

Elias Levy commented on FLINK-9061:
---

Just a note to say that changes AWS made to S3 in July means that it is a lot 
more difficult to hit S3 performance limits that would require this feature, as 
now S3 can do up to 3.5K concurrent writes. See 
[https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/https://aws.amazon.com/about-aws/whats-new/2018/07/amazon-s3-announces-increased-request-rate-performance/].

> Add entropy to s3 path for better scalability
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Improvement
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2, 1.5.0
>Reporter: Jamie Grier
>Assignee: Indrajit Roychoudhury
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.2, 1.7.0
>
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-10-23 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660864#comment-16660864
 ] 

Elias Levy commented on FLINK-10052:


[~Wosinsan] any progress on this issue?

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Dominik Wosiński
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10617) Restoring job fails because of slot allocation timeout

2018-10-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10617:
--

 Summary: Restoring job fails because of slot allocation timeout
 Key: FLINK-10617
 URL: https://issues.apache.org/jira/browse/FLINK-10617
 Project: Flink
  Issue Type: Bug
  Components: ResourceManager, TaskManager
Affects Versions: 1.6.1
Reporter: Elias Levy


The following may be related to FLINK-9932, but I am unsure.  If you believe it 
is, go ahead and close this issue and a duplicate.

While trying to test local state recovery on a job with large state, the job 
failed to be restored because slot allocation timed out.

The job is running on a standalone cluster with 12 nodes and 96 task slots (8 
per node).  The job has parallelism of 96, so it consumes all of the slots, and 
has ~200 GB of state in RocksDB.  

To test local state recovery I decided to kill one of the TMs.  The TM 
immediately restarted and re-registered with the JM.  I confirmed the JM showed 
96 registered task slots.
{noformat}
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Resolved ResourceManager address, beginning registration
21:35:44,616 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Registration at ResourceManager attempt 1 (timeout=100ms)
21:35:44,640 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Successful registration at resource manager 
akka.tcp://flink@172.31.18.172:6123/user/resourcemanager under registration id 
302988dea6afbd613bb2f96429b65d18.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{4274d96a59d370305520876f5b84fb9f} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{4274d96a59d370305520876f5b84fb9f}.
21:36:49,667 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,668 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,671 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{3a64e2c8c5b22adbcfd3ffcd2b49e7f9}.
21:36:49,681 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,681 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Starting ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,683 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Try to register at job manager 
akka.tcp://flink@172.31.18.172:6123/user/jobmanager_3 with leader id 
f85f6f9b-7713-4be3-a8f0-8443d91e5e6d.
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Resolved JobManager address, beginning registration
21:36:49,687 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Receive slot request AllocationID{740caf20a5f7f767864122dc9a7444d9} for job 
87c61e8ee64cdbd50f191d39610eb58f from resource manager with leader id 
8e06aa64d5f8961809da38fe7f224cc1.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Registration at JobManager attempt 1 (timeout=100ms)
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   
 - Allocated slot for AllocationID{740caf20a5f7f767864122dc9a7444d9}.
21:36:49,688 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService   
 - Add job 87c61e8ee64cdbd50f191d39610eb58f for job leader monitoring.
21:36:49,688 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
Stopping ZooKeeperLeaderRetrievalService 
/leader/87c61e8ee64cdbd50f191d39610eb58f/job_manager_lock.
21:36:49,688 INFO  

[jira] [Created] (FLINK-10520) Job save points REST API fails unless parameters are specified

2018-10-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10520:
--

 Summary: Job save points REST API fails unless parameters are 
specified
 Key: FLINK-10520
 URL: https://issues.apache.org/jira/browse/FLINK-10520
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.6.1
Reporter: Elias Levy


The new REST API POST endpoint, {{/jobs/:jobid/savepoints}}, returns an error 
unless the request includes a body with all parameters ({{target-directory}} 
and {{cancel-job}})), even thought the 
[documentation|https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.html]
 suggests these are optional.

If a POST request with no data is made, the response is a 400 status code with 
the error message "Bad request received."

If the POST request submits an empty JSON object ( {} ), the response is a 400 
status code with the error message "Request did not match expected format 
SavepointTriggerRequestBody."  The same is true if only the 
{{target-directory}} or {{cancel-job}} parameters are included.

As the system is configured with a default savepoint location, there shouldn't 
be a need to include the parameter in the quest.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-04 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16638834#comment-16638834
 ] 

Elias Levy commented on FLINK-10493:


This issue was noted [~tzulitai] back in June 2017 
[here|https://github.com/apache/flink/pull/4090#issuecomment-307109692].

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 
> 1.5.4
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> 

[jira] [Created] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-04 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10493:
--

 Summary: Macro generated CaseClassSerializer considered harmful
 Key: FLINK-10493
 URL: https://issues.apache.org/jira/browse/FLINK-10493
 Project: Flink
  Issue Type: Bug
  Components: Scala API, State Backends, Checkpointing, Type 
Serialization System
Affects Versions: 1.5.4, 1.6.1, 1.6.0, 1.5.3, 1.5.2, 1.5.1, 1.4.2, 1.4.1, 
1.4.0
Reporter: Elias Levy


The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
and {{TypeSerializer}} objects for types.  In the case of Scala tuple and case 
classes, the macro generates an [anonymous {{CaseClassSerializer}} 
class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
  

The Scala compiler will generate a name for the anonymous class that depends on 
the relative position in the code of the macro invocation to other anonymous 
classes.  If the code is changed such that the anonymous class relative 
position changes, even if the overall logic of the code or the type in question 
do not change, the name of the serializer class will change.

That will result in errors, such as the one below, if the job is restored from 
a savepoint, as the serializer to read the data in the savepoint will no longer 
be found, as its name will have changed.

At the very least, there should be a prominent warning in the documentation 
about this issue.  Minor code changes can result in jobs that can't restore 
previous state.  Ideally, the use of anonymous classes should be deprecated if 
possible.

{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 

[jira] [Closed] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled

2018-10-04 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-10483.
--
Resolution: Invalid

> Can't restore from a savepoint even with Allow Non Restored State enabled
> -
>
> Key: FLINK-10483
> URL: https://issues.apache.org/jira/browse/FLINK-10483
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Major
>
> A trimmed streaming job fails a restore from a savepoint with an Unloadable 
> class for type serializer error, even though the case class in question has 
> been eliminated from the job and Allow Non Restored State is enabled.
> We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, 
> one of the streams is processed by an async function, and the output of the 
> async function and the other original stream are consumed by a 
> CoProcessOperator, that intern emits Scala case class instances, that go into 
> a stateful ProcessFunction filter, and then into a sink.  I.e.
> {code:java}
> source 1 -> async function --\
>|---> co process --> process 
> --> sink
> source 2 --/
> {code}
> I eliminated most of the DAG, leaving only the source 1 --> async function 
> portion of it.  This removed the case class in question from the processing 
> graph.  When I try to restore from the savepoint, even if Allow Non Restored 
> State is selected, the job fails to restore with the error "Deserialization 
> of serializer erroed".
> This is the error being generated:
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>   ... 14 more
> Caused by: java.lang.ClassNotFoundException: 
> com.somewhere.TestJob$$anon$13$$anon$3
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at 

[jira] [Created] (FLINK-10483) Can't restore from a savepoint even with Allow Non Restored State enabled

2018-10-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10483:
--

 Summary: Can't restore from a savepoint even with Allow Non 
Restored State enabled
 Key: FLINK-10483
 URL: https://issues.apache.org/jira/browse/FLINK-10483
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Type Serialization System
Affects Versions: 1.4.2
Reporter: Elias Levy


A trimmed streaming job fails a restore from a savepoint with an Unloadable 
class for type serializer error, even though the case class in question has 
been eliminated from the job and Allow Non Restored State is enabled.

We have a job running on a Flink 1.4.2 cluster with two Kafka input streams, 
one of the streams is processed by an async function, and the output of the 
async function and the other original stream are consumed by a 
CoProcessOperator, that intern emits Scala case class instances, that go into a 
stateful ProcessFunction filter, and then into a sink.  I.e.

{code:java}
source 1 -> async function --\
   |---> co process --> process --> 
sink
source 2 --/
{code}

I eliminated most of the DAG, leaving only the source 1 --> async function 
portion of it.  This removed the case class in question from the processing 
graph.  When I try to restore from the savepoint, even if Allow Non Restored 
State is selected, the job fails to restore with the error "Deserialization of 
serializer erroed".

This is the error being generated:


{noformat}
WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  - 
Deserialization of serializer errored; replacing with null.
java.io.IOException: Unloadable class for type serializer.
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
Caused by: java.io.InvalidClassException: failed to read class descriptor
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
at java.io.ObjectInputStream.readSerialData(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
... 14 more
Caused by: java.lang.ClassNotFoundException: 
com.somewhere.TestJob$$anon$13$$anon$3
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at 

[jira] [Created] (FLINK-10460) DataDog reporter JsonMappingException

2018-09-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10460:
--

 Summary: DataDog reporter JsonMappingException
 Key: FLINK-10460
 URL: https://issues.apache.org/jira/browse/FLINK-10460
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


Observed the following error in the TM logs this morning:


{code:java}
WARN  org.apache.flink.metrics.datadog.DatadogHttpReporter  - Failed 
reporting metrics to Datadog.
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException:
 (was java.util.ConcurrentModificationException) (through reference chain: 
org.apache.flink.metrics.datadog.DSeries["series"]->
java.util.ArrayList[88]->org.apache.flink.metrics.datadog.DGauge["points"])
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:379)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonMappingException.wrapWithPath(JsonMappingException.java:339)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer.wrapAndThrow(StdSerializer.java:342)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:686)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:672)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:678)
  at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:157)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:130)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3631)
   at 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2998)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.serialize(DatadogHttpClient.java:90)
   at 
org.apache.flink.metrics.datadog.DatadogHttpClient.send(DatadogHttpClient.java:79)
   at 
org.apache.flink.metrics.datadog.DatadogHttpReporter.report(DatadogHttpReporter.java:143)
  at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
   at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
   at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown
 Source)
   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
   at java.lang.Thread.run(Unknown Source)
 Caused by: java.util.ConcurrentModificationException
   at java.util.LinkedHashMap$LinkedHashIterator.nextNode(Unknown Source)
   at java.util.LinkedHashMap$LinkedKeyIterator.next(Unknown Source)
   at java.util.AbstractCollection.addAll(Unknown Source)
   at java.util.HashSet.(Unknown Source)
   at 
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
   at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
   at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
   at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
  at 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
  at org.apache.flink.metrics.datadog.DGauge.getMetricValue(DGauge.java:42)
  at 

[jira] [Created] (FLINK-10390) DataDog metric reporter leak warning

2018-09-21 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10390:
--

 Summary: DataDog metric reporter leak warning
 Key: FLINK-10390
 URL: https://issues.apache.org/jira/browse/FLINK-10390
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.6.1
Reporter: Elias Levy


After upgrading to 1.6.1 from 1.4.2 we starting observing in the log warnings 
associated with the DataDog metrics reporter:
{quote}Sep 21, 2018 9:43:20 PM 
org.apache.flink.shaded.okhttp3.internal.platform.Platform log WARNING: A 
connection to https://app.datadoghq.com/ was leaked. Did you forget to close a 
response body? To see where this was allocated, set the OkHttpClient logger 
level to FINE: 
Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
{quote}
The metric reporter's okhttp dependency version (3.7.0) has not changed, so 
that does not appear to be the source of the warning.

I believe the issue is the changed made in 
[FLINK-8553|https://github.com/apache/flink/commit/ae3d547afe7ec44d37b38222a3ea40d9181e#diff-fc396ba6772815fc05efc1310760cd4b].
  The HTTP calls were made async.  The previous code called 
{{client.newCall(r).execute().close()}}.  The new call does nothing in the 
callback, even thought the [Callback.onResponse 
documentation|https://square.github.io/okhttp/3.x/okhttp/okhttp3/Callback.html#onResponse-okhttp3.Call-okhttp3.Response-]
 states:

bq. Called when the HTTP response was successfully returned by the remote 
server. The callback may proceed to read the response body with Response.body. 
The response is still live until its response body is closed. 



 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10372) There is no API to configure the timer state backend

2018-09-19 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10372:
--

 Summary: There is no API to configure the timer state backend
 Key: FLINK-10372
 URL: https://issues.apache.org/jira/browse/FLINK-10372
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, State Backends, Checkpointing
Affects Versions: 1.6.0
Reporter: Elias Levy


Flink 1.6.0, via FLINK-9485, introduced the option to store timers in RocksDB 
instead of the heap.  Alas, this can only be configured via the 
{{state.backend.rocksdb.timer-service.factory}} config file option.  That means 
that the choice of state backend to use for timer can't be made on a per job 
basis on a shared cluster.

There is a need for an API in {{RocksDBStateBackend}} to configure the backend 
per job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-17 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617899#comment-16617899
 ] 

Elias Levy commented on FLINK-10348:


[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max 
waits, etc are simply mechanism you can use to attempt to influence the rate of 
processing the better align the timestamps.  Those mechanism are at least one 
level removed from the actual issue.  It is best to address the issue directly 
by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, 
merging watermarks and messages from each partition, which it then forwards 
downstream.  The Kafka consumer can also selectively forward messages from the 
partitions with the lowest waternark if they are available. 

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-17 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617899#comment-16617899
 ] 

Elias Levy edited comment on FLINK-10348 at 9/17/18 5:48 PM:
-

[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max 
waits, etc are simply mechanism you can use to attempt to influence the rate of 
processing to better align the timestamps.  Those mechanism are at least one 
level removed from the actual issue.  It is best to address the issue directly 
by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, 
merging watermarks and messages from each partition, which it then forwards 
downstream.  The Kafka consumer can also selectively forward messages from the 
partitions with the lowest waternark if they are available. 


was (Author: elevy):
[~wind_ljy]

Re: 1.  The problem is timestamp alignment.  Setting like fetch sizes, max 
waits, etc are simply mechanism you can use to attempt to influence the rate of 
processing the better align the timestamps.  Those mechanism are at least one 
level removed from the actual issue.  It is best to address the issue directly 
by attempting to align timestamp during consumption.

Re: 2.  Internally the Kafka consumer behaves like a multiple input operator, 
merging watermarks and messages from each partition, which it then forwards 
downstream.  The Kafka consumer can also selectively forward messages from the 
partitions with the lowest waternark if they are available. 

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10348) Solve data skew when consuming data from kafka

2018-09-16 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616933#comment-16616933
 ] 

Elias Levy commented on FLINK-10348:


I would suggest the strategy employed by Kafka Streams, which performs a best 
effort attempt to align streams by selectively fetching from the stream with 
the lowest watermark of there are messages available. 

Rather than implementing something like this writhin the Kafka connector 
source, which are independent tasks in Flink, I would suggest implementing it 
within multiple input operators. The operator can selectively process messages 
from the input stream with the lowest waternark if they are available. Back 
preassure can the take care of slowing down the higher volume input of 
nessesary. 

> Solve data skew when consuming data from kafka
> --
>
> Key: FLINK-10348
> URL: https://issues.apache.org/jira/browse/FLINK-10348
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Affects Versions: 1.6.0
>Reporter: Jiayi Liao
>Assignee: Jiayi Liao
>Priority: Major
>
> By using KafkaConsumer, our strategy is to send fetch request to brokers with 
> a fixed fetch size. Assume x topic has n partition and there exists data skew 
> between partitions, now we need to consume data from x topic with earliest 
> offset, and we can get max fetch size data in every fetch request. The 
> problem is that when an task consumes data from both "big" partitions and 
> "small" partitions, the data in "big" partitions may be late elements because 
> "small" partitions are consumed faster.
> *Solution: *
> I think we can leverage two parameters to control this.
> 1. data.skew.check // whether to check data skew
> 2. data.skew.check.interval // the interval between checks
> Every data.skew.check.interval, we will check the latest offset of every 
> specific partition, and calculate (latest offset - current offset), then get 
> partitions which need to slow down and redefine their fetch size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10184) HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel

2018-08-20 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16586177#comment-16586177
 ] 

Elias Levy commented on FLINK-10184:


[~Jamalarm] this seems like a duplicate of FLINK-10011.  Can you confirm?

> HA Failover broken due to JobGraphs not being removed from Zookeeper on cancel
> --
>
> Key: FLINK-10184
> URL: https://issues.apache.org/jira/browse/FLINK-10184
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.2
>Reporter: Thomas Wozniakowski
>Priority: Blocker
>
> We have encountered a blocking issue when upgrading our cluster to 1.5.2.
> It appears that, when jobs are cancelled manually (in our case with a 
> savepoint), the JobGraphs are NOT removed from the Zookeeper {{jobgraphs}} 
> node.
> This means that, if you start a job, cancel it, restart it, cancel it, etc. 
> You will end up with many job graphs stored in zookeeper, but none of the 
> corresponding blobs in the Flink HA directory.
> When a HA failover occurs, the newly elected leader retrieves all of those 
> old JobGraph objects from Zookeeper, then goes looking for the corresponding 
> blobs in the HA directory. The blobs are not there so the JobManager explodes 
> and the process dies.
> At this point the cluster has to be fully stopped, the zookeeper jobgraphs 
> cleared out by hand, and all the jobmanagers restarted.
> I can see the following line in the JobManager logs:
> {quote}
> 2018-08-20 16:17:20,776 INFO  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  - 
> Removed job graph 4e9a5a9d70ca99dbd394c35f8dfeda65 from ZooKeeper.
> {quote}
> But looking in Zookeeper the {{4e9a5a9d70ca99dbd394c35f8dfeda65}} job is 
> still very much there.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10133) finished job's jobgraph never been cleaned up in zookeeper for standalone clusters (HA mode with multiple masters)

2018-08-13 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578792#comment-16578792
 ] 

Elias Levy commented on FLINK-10133:


[~Frefreak] this is likely the same issue as FLINK-10011.  If so, mark this one 
as a duplicate.

> finished job's jobgraph never been cleaned up in zookeeper for standalone 
> clusters (HA mode with multiple masters)
> --
>
> Key: FLINK-10133
> URL: https://issues.apache.org/jira/browse/FLINK-10133
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0, 1.5.2, 1.6.0
>Reporter: Xiangyu Zhu
>Priority: Major
>
> Hi,
> We have 3 servers in our test environment, noted as node1-3. Setup is as 
> following:
>  * hadoop hdfs: node1 as namenode, node2,3 as datanode
>  * zookeeper: node1-3 as a quorum (but also tried node1 alone)
>  * flink: node1,2 as masters, node2,3 as slaves
> As my understanding when a job finished the corresponding job's blob data is 
> expected to be deleted from hdfs path and node under zookeeper's path `/\{zk 
> path root}/\{cluster-id}/jobgraphs/\{job id}` should be deleted after that. 
> However we observe that whenever we submitted a job and it finished (via 
> `bin/flink run WordCount.jar`), the blob data is gone whereas job id node 
> under zookeeper is still there, with a uuid style lock node inside it. From 
> the debug node in zookeeper we observed something like "cannot be deleted 
> because non empty". Because of this, as long as a job is finished and the 
> jobgraph node persists, if restart the clusters or kill one manager (to test 
> HA mode), it tries to recover a finished job and couldn't find blob data 
> under hdfs, and the whole cluster is down.
> If we use only node1 as master and node2,3 as slaves, the jobgraphs node can 
> be deleted successfully. If the jobgraphs is clean, killing one job manager 
> makes another stand-by JM raised as leader, so it is only this jobgraphs 
> issue preventing HA from working.
> I'm not sure if there's something wrong with our configs because this happens 
> every time for finished job (we only tested with wordcount.jar though). I'm 
> aware of FLINK-10011 and FLINK-10029, but unlike FLINK-10011 this happens 
> every time, rendering HA mode un-useable for us.
> Any idea what might cause this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10118) Queryable state MapState entry query

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10118:
--

 Summary: Queryable state MapState entry query
 Key: FLINK-10118
 URL: https://issues.apache.org/jira/browse/FLINK-10118
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State
Affects Versions: 1.6.0
Reporter: Elias Levy


Queryable state allows querying of keyed MapState, but such a query returns all 
MapState entries for the given key.  In some cases, such MapState many include 
substantial number of entries (in the millions), while the user may only be 
interested in one entry.

I propose we allow queries for MapState to provide one or more map entry keys, 
in addition to the state key, and to only return entries for the given map keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10117) REST API for Queryable State

2018-08-09 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10117:
--

 Summary: REST API for Queryable State
 Key: FLINK-10117
 URL: https://issues.apache.org/jira/browse/FLINK-10117
 Project: Flink
  Issue Type: Improvement
  Components: Queryable State, REST
Affects Versions: 1.6.0
Reporter: Elias Levy


At the moment, queryable state requires a JVM based client that can make use of 
the Java queryable state client API in flink-queryable-state-client artifact.  
In addition, the client requires a state descriptor matching the queried state, 
which tightly couples the Flink job and query state clients.

I propose that queryable state become accessible via a REST API.  FLINK-7040 
mentions this possibility, but does not specify work towards that goal.

I suggest that to enable queryable state over REST, users define JSON 
serializers via the state descriptors.  

This would allow queryable state clients to be developed in any language, not 
require them to use a Flink client library, and permit them to be loosely 
coupled with the job, as they could generically parse the returned JSON.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10098) Programmatically select timer storage backend

2018-08-07 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10098:
--

 Summary: Programmatically select timer storage backend
 Key: FLINK-10098
 URL: https://issues.apache.org/jira/browse/FLINK-10098
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing, Streaming, TaskManager
Affects Versions: 1.6.0, 1.7.0
Reporter: Elias Levy


FLINK-9486 introduced timer storage on the RocksDB storage backend.  Right now 
it is only possible to configure RocksDB as the storage for timers by settings 
the {{state.backend.rocksdb.timer-service.factory}} value in the configuration 
file for Flink.

As the state storage backend can be programmatically selected by by jobs via  
{{env.setStateBackend(...)}}, the timer backend should also be configurable 
programmatically.

Different jobs should be able to store their timers in different storage 
backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10067) Add Kafka 1.0.0/1.1.0 connectors

2018-08-06 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16570371#comment-16570371
 ] 

Elias Levy commented on FLINK-10067:


Kafka clients are compatible with lower version servers and Kafka 2.0.0 has 
been released.  Should this issue add a 2.0.0 connector instead?

> Add Kafka 1.0.0/1.1.0 connectors
> 
>
> Key: FLINK-10067
> URL: https://issues.apache.org/jira/browse/FLINK-10067
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Aljoscha Krettek
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569040#comment-16569040
 ] 

Elias Levy commented on FLINK-8545:
---

[~hequn8128] are you still working on this feature?

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568955#comment-16568955
 ] 

Elias Levy commented on FLINK-9600:
---

[~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape 
hatch when you can't perform what you want within the higher level DSL.  The 
improvement I am suggestion is within the higher level DSL.

E.g.it is a lot nicer to write:
{code:java}
dataStream.filter( (x, ts) => { isDayTime(ts) } )
{code}
than
{code:java}
class ProcessFilter extends ProcessFunction[T,T] {
  override def processElement(value: T, ctx: Context, out: Collector[T]): Unit 
={
if (isDayTime(ctx.timestamp))
  out.collect(value) }
  }
} 
dataStream.process(new ProcessFilter())
{code}
 

> Add DataStream transformation variants that pass timestamp to the user 
> function
> ---
>
> Key: FLINK-9600
> URL: https://issues.apache.org/jira/browse/FLINK-9600
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Priority: Minor
>
> It is often necessary to access the timestamp assigned to records within user 
> functions.  At the moment this is only possible from {{RichFunction}}. 
> Implementing a {{RichFunction}} just to access the timestamp is burdensome, 
> so most job carry a duplicate of the timestamp within the record.
> It would be useful if {{DataStream}} provided transformation methods that 
> accepted user functions that could be passed the record's timestamp as an 
> additional argument, similar to how there are two variants of {{flatMap}}, 
> one with an extra parameter that gives the user function access to the output 
> {{Collector}}.
> Along similar lines, it may be useful to have variants that pass the record's 
> key as an additional parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9720) Introduce ResourceTag class for tag support in scheduling

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9720?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568928#comment-16568928
 ] 

Elias Levy commented on FLINK-9720:
---

This is show as fixed in 1.6.0 but has not been committed.

> Introduce ResourceTag class for tag support in scheduling
> -
>
> Key: FLINK-9720
> URL: https://issues.apache.org/jira/browse/FLINK-9720
> Project: Flink
>  Issue Type: New Feature
>  Components: Scheduler
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568927#comment-16568927
 ] 

Elias Levy commented on FLINK-9662:
---

This is show as fixed in 1.6.0, but AFAIK nothing has been committed to show 
that being case.

> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: job isolation sequence.jpg
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568832#comment-16568832
 ] 

Elias Levy edited comment on FLINK-10052 at 8/3/18 9:46 PM:


[~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to 
replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during 
temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch}} and that policy.  The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in 
{{SUSPENDED}} state, so that a disconnected client won't continue to think it 
is leader past its session expiration.

So it is possible that all we need to do is call 
{{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the 
{{CuratorFrameworkFactory}}.


was (Author: elevy):
[~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to 
replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during 
temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED }}and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch}} and that policy.  The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in {{SUSPENDED 
}}state, so that a disconnected client won't continue to think it is leader 
past its session expiration.

So it is possible that all we need to do is call 
{{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the 
{{CuratorFrameworkFactory}}.

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10052) Tolerate temporarily suspended ZooKeeper connections

2018-08-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568832#comment-16568832
 ] 

Elias Levy commented on FLINK-10052:


[~till.rohrmann] as I mentioned in FLINK-10011, it may not be necessary to 
replace the {{LeaderLatch}} Curator recipe to avoid loosing leadership during 
temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED }}and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch}} and that policy.  The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in {{SUSPENDED 
}}state, so that a disconnected client won't continue to think it is leader 
past its session expiration.

So it is possible that all we need to do is call 
{{connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())}} in the 
{{CuratorFrameworkFactory}}.

> Tolerate temporarily suspended ZooKeeper connections
> 
>
> Key: FLINK-10052
> URL: https://issues.apache.org/jira/browse/FLINK-10052
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.2, 1.5.2, 1.6.0
>Reporter: Till Rohrmann
>Priority: Major
>
> This issue results from FLINK-10011 which uncovered a problem with Flink's HA 
> recovery and proposed the following solution to harden Flink:
> The {{ZooKeeperLeaderElectionService}} uses the {{LeaderLatch}} Curator 
> recipe for leader election. The leader latch revokes leadership in case of a 
> suspended ZooKeeper connection. This can be premature in case that the system 
> can reconnect to ZooKeeper before its session expires. The effect of the lost 
> leadership is that all jobs will be canceled and directly restarted after 
> regaining the leadership.
> Instead of directly revoking the leadership upon a SUSPENDED ZooKeeper 
> connection, it would be better to wait until the ZooKeeper connection is 
> LOST. That way we would allow the system to reconnect and not lose the 
> leadership. This could be achievable by using Curator's {{LeaderSelector}} 
> instead of the {{LeaderLatch}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10037:
--

 Summary: Document details event time behavior in a single location
 Key: FLINK-10037
 URL: https://issues.apache.org/jira/browse/FLINK-10037
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.2
Reporter: Elias Levy
Assignee: Elias Levy


A description of event time and watermarks, how they generated, assigned, and 
handled, is spread across many pages in the documentation.  I would be useful 
to have it all in a single place and includes missing information, such as how 
Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10030) zookeeper jobgraphs job info cannot be removed when the job is cancelled with zk ha mode

2018-08-02 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566973#comment-16566973
 ] 

Elias Levy commented on FLINK-10030:


This is possibly related to FLINK-10011.

> zookeeper jobgraphs job info cannot be removed when the job is cancelled with 
> zk ha mode
> 
>
> Key: FLINK-10030
> URL: https://issues.apache.org/jira/browse/FLINK-10030
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: qiang.li
>Priority: Major
>
> flink 1.5 with zk ha mode,when a job is cancelled,if you restart the 
> cluster,the jobmanager will fail because of missing the blob data. I find 
> that  the information about the job in zk node jobgraphs cannot be removed 
> due to the standby jobmanager lock the node.I think that standby jobmanager 
> should not be watch the jobgraphs node.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-08-01 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16566032#comment-16566032
 ] 

Elias Levy commented on FLINK-10011:


[~azagrebin] I don't think they are the same issue.  The issue I am observing 
is that the new JM leader after a failover can't delete a job graph in ZK when 
it is canceled because the previous JM leader still has the job graph locked in 
ZK via the child ephemeral node.

This is the state in ZK:

[zk: localhost:2181(CONNECTED) 5] ls /flink/cluster_1/jobgraphs
[d77948df92813a68ea6dfd6783f40e7e, 2a4eff355aef849c5ca37dbac04f2ff1]

Job 2a4eff355aef849c5ca37dbac04f2ff1 was running before fail over and we 
canceled after fail over.  The job is no longer running, but it is still in ZK.

In the logs we see that the JM 1 (10.210.22.167), that one that became leader 
after failover, thinks it deleted the 2a4eff355aef849c5ca37dbac04f2ff1 job from 
ZK when it was canceled:

July 30th 2018, 15:32:27.231Trying to cancel job with ID 
2a4eff355aef849c5ca37dbac04f2ff1.
July 30th 2018, 15:32:27.232Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) 
switched from state RESTARTING to CANCELED.
July 30th 2018, 15:32:27.232Stopping checkpoint coordinator for job 
2a4eff355aef849c5ca37dbac04f2ff1
July 30th 2018, 15:32:27.239Removed job graph 
2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper.
July 30th 2018, 15:32:27.245Removing 
/flink/cluster_1/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper
July 30th 2018, 15:32:27.251Removing 
/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 from ZooKeeper

Looking at the ZK logs I find the problem:

July 30th 2018, 15:32:27.241Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:delete cxid:0x434c zxid:0x60009dd94 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1 
Error:KeeperErrorCode = Directory not empty for 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1

Looking in ZK, we see:

[zk: localhost:2181(CONNECTED) 0] ls 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1
[d833418c-891a-4b5e-b983-080be803275c]

>From the comments in ZooKeeperStateHandleStore.java I gather that this child 
>node is used as a deletion lock.  Looking at the contents of this ephemeral 
>lock node:

[zk: localhost:2181(CONNECTED) 16] get 
/flink/cluster_1/jobgraphs/2a4eff355aef849c5ca37dbac04f2ff1/d833418c-891a-4b5e-b983-080be803275c
10.210.42.62
cZxid = 0x60002ffa7
ctime = Tue Jun 12 20:01:26 UTC 2018
mZxid = 0x60002ffa7
mtime = Tue Jun 12 20:01:26 UTC 2018
pZxid = 0x60002ffa7
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3003f4a0003
dataLength = 12
numChildren = 0

and compared to the ephemeral node lock of the currently running job:

[zk: localhost:2181(CONNECTED) 17] get 
/flink/cluster_1/jobgraphs/d77948df92813a68ea6dfd6783f40e7e/596a4add-9f5c-4113-99ec-9c942fe91172
10.210.22.167
cZxid = 0x60009df4b
ctime = Mon Jul 30 23:01:04 UTC 2018
mZxid = 0x60009df4b
mtime = Mon Jul 30 23:01:04 UTC 2018
pZxid = 0x60009df4b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x201d2330001
dataLength = 13
numChildren = 0

Assuming the content of the nodes represent the owner, it seems the job graph 
for the old canceled job, 2a4eff355aef849c5ca37dbac04f2ff1, is locked by the 
previous JM leader, JM 2(10.210.42.62), while the running job locked by the 
current JM leader, JM 1 (10.210.22.167).

Somehow the previous leader, JM 2, did not give up the lock when leadership 
failed over to JM 2.  Note that JM 2 never lost it's ZK session, as it 
recovered it when it connected to another ZK node.  So some code in the 
JobManager needed to explicitly release the lock on the job graph during 
failover and failed to do so.

[~till.rohrmann] and [~uce] I think you wrote the ZK HA code.  Any thoughts?


> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes 

[jira] [Updated] (FLINK-9575) Potential race condition when removing JobGraph in HA

2018-08-01 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-9575:
--
Description: 
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent 
shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow 
the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", 
t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob 
files connected with this jar. This means as far as I understand that there is 
a potential problem that we can fail to remove job graph from 
_submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
can try to recover such job, but it will fail with an exception since the 
assigned blob was already removed.

  was:
When we are removing the _JobGraph_ from _JobManager_ for example after 
invoking _cancel()_, the following code is executed : 
{noformat}
 
val futureOption = currentJobs.get(jobID) match { case Some((eg, _)) => val 
result = if (removeJobFromStateBackend) { val futureOption = Some(future { try 
{ // ...otherwise, we can have lingering resources when there is a concurrent 
shutdown // and the ZooKeeper client is closed. Not removing the job 
immediately allow the // shutdown to release all resources. 
submittedJobGraphs.removeJobGraph(jobID) } catch { case t: Throwable => 
log.warn(s"Could not remove submitted job graph $jobID.", t) } 
}(context.dispatcher)) try { archive ! decorateMessage( ArchiveExecutionGraph( 
jobID, ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => 
log.warn(s"Could not archive the execution graph $eg.", t) } futureOption } 
else { None } currentJobs.remove(jobID) result case None => None } // remove 
all job-related BLOBs from local and HA store 
libraryCacheManager.unregisterJob(jobID) blobServer.cleanupJob(jobID, 
removeJobFromStateBackend) jobManagerMetricGroup.removeJob(jobID) futureOption }
val futureOption = currentJobs.get(jobID) match {
case Some((eg, _)) =>
val result = if (removeJobFromStateBackend) {
val futureOption = Some(future {
try {
// ...otherwise, we can have lingering resources when there is a concurrent 
shutdown
// and the ZooKeeper client is closed. Not removing the job immediately allow 
the
// shutdown to release all resources.
submittedJobGraphs.removeJobGraph(jobID)
} catch {
case t: Throwable => log.warn(s"Could not remove submitted job graph $jobID.", 
t)
}
}(context.dispatcher))

try {
archive ! decorateMessage(
ArchiveExecutionGraph(
jobID,
ArchivedExecutionGraph.createFrom(eg)))
} catch {
case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
}

futureOption
} else {
None
}

currentJobs.remove(jobID)

result
case None => None
}

// remove all job-related BLOBs from local and HA store
libraryCacheManager.unregisterJob(jobID)
blobServer.cleanupJob(jobID, removeJobFromStateBackend)

jobManagerMetricGroup.removeJob(jobID)

futureOption
}{noformat}
This causes the asynchronous removal of the job and synchronous removal of blob 
files connected with this jar. This means as far as I understand that there is 
a potential problem that we can fail to remove job graph from 
_submittedJobGraphs._ If the JobManager fails and we elect the new leader it 
can try to recover such job, but it will fail with an exception since the 
assigned blob was already removed.


> Potential race condition when removing JobGraph in HA
> -
>
> Key: FLINK-9575
> URL: https://issues.apache.org/jira/browse/FLINK-9575
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: Dominik Wosiński
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> When we are removing the _JobGraph_ from _JobManager_ for example after 
> invoking _cancel()_, the 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564573#comment-16564573
 ] 

Elias Levy commented on FLINK-10011:


[~trohrm...@apache.org] what do you think?

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
> leadership with leader session ID 
> Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter 
> Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager 
> Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. 
> Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job 
> {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing 
> sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 
> 

[jira] [Comment Edited] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564569#comment-16564569
 ] 

Elias Levy edited comment on FLINK-10011 at 8/1/18 12:48 AM:
-

It appears that it may not be necessary to replace the {{LeaderLatch}} Curator 
recipe to avoid loosing leadership during temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch}} and that policy.  The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in 
{{SUSPENDED}} state, so that a disconnected client won't continue to think it 
is leader past its session expiration.


was (Author: elevy):
It appears that it may not be necessary to replace the {{LeaderLatch}} Curator 
recipe to avoid loosing leadership during temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch and that policy.  
The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in 
{{SUSPENDED}} state, so that a disconnected client won't continue to think it 
is leader past its session expiration.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint 

[jira] [Commented] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564569#comment-16564569
 ] 

Elias Levy commented on FLINK-10011:


It appears that it may not be necessary to replace the {{LeaderLatch}} Curator 
recipe to avoid loosing leadership during temporary connection failures.

The Curator error handling 
[documentation|https://curator.apache.org/errors.html] talks about a 
{{SessionConnectionStateErrorPolicy}} that treats {{SUSPENDED}} and {{LOST}} 
connection states differently.  And this 
[test|https://github.com/apache/curator/blob/d502dde1c4601b2abc6d831d764561a73316bf00/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java#L72-L146]
 shows how leadership is not lost with a {{LeaderLatch and that policy.  
The 
[code|https://github.com/apache/curator/blob/ed3082ecfebc332ba96da7a5bda4508a1985db6e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java#L625-L631]
 implementing the policy.  And [this 
shows|https://github.com/apache/curator/blob/5920c744508afd678a20309313e1b8d78baac0c4/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java#L298-L314]
 that Curator will inject a session expiration even while it is in 
{{SUSPENDED}} state, so that a disconnected client won't continue to think it 
is leader past its session expiration.

> Old job resurrected during HA failover
> --
>
> Key: FLINK-10011
> URL: https://issues.apache.org/jira/browse/FLINK-10011
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during 
> JobManager high-availability fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
> SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 
> 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
> likely server has closed socket, closing socket connection and attempting 
> reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
> are not monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender 
> akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in 
> the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader 
> from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked 
> leadership.}}
>  * 15:19:57 JM 2 changes job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job 
> {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
> because there is no leader
>  ** {{Discard message 
> LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
>  TaskManager akka://flink/user/taskmanager is disassociating)) because there 
> is currently no valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
> restarted.}}
>  ** {{Session establishment complete on server 
> ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
> 0x3003f4a0003, negotiated timeout = 4}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be 
> restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs 
> are monitored again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager 

[jira] [Created] (FLINK-10011) Old job resurrected during HA failover

2018-07-31 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10011:
--

 Summary: Old job resurrected during HA failover
 Key: FLINK-10011
 URL: https://issues.apache.org/jira/browse/FLINK-10011
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.2
Reporter: Elias Levy


For the second time we've observed Flink resurrect an old job during JobManager 
high-availability fail over.
h4. Configuration
 * AWS environment
 * Flink 1.4.2 standalong cluster in HA mode
 * 2 JMs, 3 TMs
 * 3 node ZK ensemble
 * 1 job consuming to/from Kafka
 * Checkpoints in S3 using the Presto file system adaptor

h4. Timeline 
 * 15:18:10 JM 2 completes checkpoint 69256.
 * 15:19:10 JM 2 completes checkpoint 69257.
 * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a 
SocketTimeoutException
 * 15:19:57 ZK 1 closes connection to JM 2 (leader)
 * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
 * 15:19:57 JM 2 reports it can't read data from ZK
 ** {{Unable to read additional data from server sessionid 0x3003f4a0003, 
likely server has closed socket, closing socket connection and attempting 
reconnect)}}
 ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
 * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs 
are not monitored (temporarily).}}
 ** {{Connection to ZooKeeper suspended. The contender 
akka.tcp://flink@flink-jm-2:6123/user/jobmanager no longer participates in the 
leader election}}{{ }}
 ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
ZooKeeper.}}
 * 15:19:57 JM 2 gives up leadership
 ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}

 * 15:19:57 JM 2 changes job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status to SUSPENDED
 ** {{Stopping checkpoint coordinator for job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}

 * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages 
because there is no leader
 ** {{Discard message 
LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
 TaskManager akka://flink/user/taskmanager is disassociating)) because there is 
currently no valid leader id known.}}

 * 15:19:57 JM 2 connects to ZK 2 and renews its session
 ** {{Opening socket connection to server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
 ** {{Socket connection established to 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, initiating session}}
 ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be 
restarted.}}
 ** {{Session establishment complete on server 
ip-10-210-43-221.ec2.internal/10.210.43.221:2181, sessionid = 
0x3003f4a0003, negotiated timeout = 4}}
 ** {{Connection to ZooKeeper was reconnected. Leader election can be 
restarted.}}
 ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are 
monitored again.}}
 ** {{State change: RECONNECTED}}

 * 15:19:57: JM 1 reports JM 1 has been granted leadership:
 ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted 
leadership with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}

 * 15:19:57 JM 2 reports the job has been suspended
 ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting 
down.}}
 ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}

 * 15:19:57 JM 2 reports it has lost leadership:
 ** {{Associated JobManager 
Actor[akka://flink/user/jobmanager#33755521|#33755521] lost leader status}}
 ** {{Received leader address but not running in leader ActorSystem. Cancelling 
registration.}}

 * 15:19:57 TMs register with JM 1

 * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
 ** {{Attempting to recover all jobs.}}
 ** {{There are 2 jobs to recover. Starting the job recovery.}}
 ** {{Attempting to recover job 
{color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
 ** {{Attempting to recover job 
{color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}

 * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4211 zxid:0x60009dc70 txntype:-1 
reqpath:n/a Error 
Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1 
Error:KeeperErrorCode = NodeExists for 
/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
 ** {{Got user-level KeeperException when processing 
sessionid:0x201d2330001 type:create cxid:0x4230 zxid:0x60009dc78 txntype:-1 
reqpath:n/a Error 

[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2018-07-29 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561216#comment-16561216
 ] 

Elias Levy commented on FLINK-6243:
---

Rereading my initial description of the issue, I see that I make no mention to 
our specific upsert requirements, so I think you are right that  FLINK-8478 
does satisfy this issue as described and that it may be best if I open a new 
issue for the upsert and (a)/(a,b) join semantics I'd like. 

> Continuous Joins:  True Sliding Window Joins
> 
>
> Key: FLINK-6243
> URL: https://issues.apache.org/jira/browse/FLINK-6243
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Flink defines sliding window joins as the join of elements of two streams 
> that share a window of time, where the windows are defined by advancing them 
> forward some amount of time that is less than the window time span.  More 
> generally, such windows are just overlapping hopping windows. 
> Other systems, such as Kafka Streams, support a different notion of sliding 
> window joins.  In these systems, two elements of a stream are joined if the 
> absolute time difference between the them is less or equal the time window 
> length.
> This alternate notion of sliding window joins has some advantages in some 
> applications over the current implementation.  
> Elements to be joined may both fall within multiple overlapping sliding 
> windows, leading them to be joined multiple times, when we only wish them to 
> be joined once.
> The implementation need not instantiate window objects to keep track of 
> stream elements, which becomes problematic in the current implementation if 
> the window size is very large and the slide is very small.
> It allows for asymmetric time joins.  E.g. join if elements from stream A are 
> no more than X time behind and Y time head of an element from stream B.
> It is currently possible to implement a join with these semantics using 
> {{CoProcessFunction}}, but the capability should be a first class feature, 
> such as it is in Kafka Streams.
> To perform the join, elements of each stream must be buffered for at least 
> the window time length.  To allow for large window sizes and high volume of 
> elements, the state, possibly optionally, should be buffered such as it can 
> spill to disk (e.g. by using RocksDB).
> The same stream may be joined multiple times in a complex topology.  As an 
> optimization, it may be wise to reuse any element buffer among colocated join 
> operators.  Otherwise, there may write amplification and increased state that 
> must be snapshotted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2018-07-29 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16561215#comment-16561215
 ] 

Elias Levy commented on FLINK-6243:
---

Stephan, thanks for bringing FLINK-8478 to my attention.  Alas, while getting 
closer to meeting our join requirements, it does not quite fulfill them.  

Our joins require the semantics of joining two upsert tables, i.e. only joining 
the latest value by key.  The DataStream Interval Join being implemented does 
not support those semantics, as it will buffer and join all elements for a key 
that fall within the interval.  Seems the upsert semantics could be implemented 
by changing the state from a {{MapState}} buffering multiple events per key to 
a {{ValueState}}, keeping the latest event per event time.

We also need these joins to be outer joins, but I see that there is already a 
subtask to implement those (FLINK-8483).

Finally, we also need to implement a join between two streams where one stream 
is keyed by a subset of other stream's composite key (e.g. the left stream is 
keyed by {{col1}} and the right stream by ({{col1}}, {{col2)}}), also with 
upsert semantics.  This could be implemented by keying both streams by 
{{col1}}, keeping a ValueState for the left stream buffering the latest event, 
and using a MapState on the right stream keyed by {{col2}} buffering the latest 
event per ({{col1}}, {{col2}}) tuple. 
 
Maybe something like:
{code:scala}
leftStream
  .keyKey(_.col1)
  .upsertJoin(rightStream.keyKey(_.col1).subKey(_.col2))
  .between(...)
  .process(...)
{code}

Looking at the implementation, I also worry that the clean up timers are not 
being coalesced, which may result in high overhead processing the clean up 
timers for high throughput streams.

> Continuous Joins:  True Sliding Window Joins
> 
>
> Key: FLINK-6243
> URL: https://issues.apache.org/jira/browse/FLINK-6243
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Flink defines sliding window joins as the join of elements of two streams 
> that share a window of time, where the windows are defined by advancing them 
> forward some amount of time that is less than the window time span.  More 
> generally, such windows are just overlapping hopping windows. 
> Other systems, such as Kafka Streams, support a different notion of sliding 
> window joins.  In these systems, two elements of a stream are joined if the 
> absolute time difference between the them is less or equal the time window 
> length.
> This alternate notion of sliding window joins has some advantages in some 
> applications over the current implementation.  
> Elements to be joined may both fall within multiple overlapping sliding 
> windows, leading them to be joined multiple times, when we only wish them to 
> be joined once.
> The implementation need not instantiate window objects to keep track of 
> stream elements, which becomes problematic in the current implementation if 
> the window size is very large and the slide is very small.
> It allows for asymmetric time joins.  E.g. join if elements from stream A are 
> no more than X time behind and Y time head of an element from stream B.
> It is currently possible to implement a join with these semantics using 
> {{CoProcessFunction}}, but the capability should be a first class feature, 
> such as it is in Kafka Streams.
> To perform the join, elements of each stream must be buffered for at least 
> the window time length.  To allow for large window sizes and high volume of 
> elements, the state, possibly optionally, should be buffered such as it can 
> spill to disk (e.g. by using RocksDB).
> The same stream may be joined multiple times in a complex topology.  As an 
> optimization, it may be wise to reuse any element buffer among colocated join 
> operators.  Otherwise, there may write amplification and increased state that 
> must be snapshotted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9953) Active Kubernetes integration

2018-07-25 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555933#comment-16555933
 ] 

Elias Levy commented on FLINK-9953:
---

This is a duplicate of FLINK-9495.

> Active Kubernetes integration
> -
>
> Key: FLINK-9953
> URL: https://issues.apache.org/jira/browse/FLINK-9953
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, ResourceManager
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.7.0
>
>
> This is the umbrella issue tracking Flink's active Kubernetes integration. 
> Active means in this context that the {{ResourceManager}} can talk to 
> Kubernetes to launch new pods similar to Flink's Yarn and Mesos integration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6239) Sharing of State Across Operators

2018-07-04 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16533142#comment-16533142
 ] 

Elias Levy commented on FLINK-6239:
---

For our particular use case having a single operator that is owner and can 
write, and other that can read the state, is sufficient.  The idea was 
essentially to implement a materialized table join (similar to Kafka Streams 
KTable).  We have need to join again the table multiple times, but the table 
(state) would be of significant size, and duplicating it is something we rather 
not do.

That said, I think we may be able to get around this issue by creating a 
generic wrapping container, unioning all the streams, and feeding it to a 
single operator that perform all the work, instead of having multiple operators 
and multiple streams.  Not as clean, but probably workable.

> Sharing of State Across Operators
> -
>
> Key: FLINK-6239
> URL: https://issues.apache.org/jira/browse/FLINK-6239
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Currently state cannot be shared across operators.  On a keyed stream, the 
> state is implicitly keyed by the operator id, in addition to the stream key.  
> This can make it more difficult and inefficient to implement complex 
> topologies, where multiple operator may need to access the same state.  It 
> would be value to be able to access keyed value and map stated across 
> operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy closed FLINK-9731.
-
Resolution: Invalid

> Kafka source subtask begins to consume from earliest offset
> ---
>
> Key: FLINK-9731
> URL: https://issues.apache.org/jira/browse/FLINK-9731
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink 
> job instance began consuming records from the earliest offsets available in 
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this 
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no 
> failed checkpoints and the job did not restore or restart. Flink logs only 
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711  Fetch offset 2340400514 is out of range for 
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka 
> connector consumer is configured to start from group offsets if it is not 
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
> consumer is configured to fallback to the earliest offsets is no group 
> offsets are committed by setting `auto.offset.reset` to `earliest` in the 
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership 
> of its partitions for around 30 seconds as a result of losing its connection 
> to ZooKeeper.
>  
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
> partition [cloud_ioc_events,32] to broker 
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server 
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to 
> 10.210.48.187/10.210.48.187:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection, 
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server 
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to 
> 10.210.43.200/10.210.43.200:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server 
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 
> 2005 (kafka.server.KafkaHealthcheck$SessionExpireListener)
> [2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? 
> false) (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
> (kafka.utils.ZKCheckedEphemeral)
> [2018-06-30 09:34:55,476] INFO Registered broker 2005 at path 
> /brokers/ids/2005 with addresses: 
> 

[jira] [Commented] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16532040#comment-16532040
 ] 

Elias Levy commented on FLINK-9731:
---

Closing as I suspect the error is on the Kafka side.  Logs indicate Kafka 
truncated the partition in the process of the broker catching up with a replica 
and regaining leadership of the partition.  But that would imply that somehow 
Kafka allowed Flink to read an uncommitted message, as we are publishing with 
acks=all, and the topic has min.insync.replicas=2, which breaks the consistency 
guarantees of Kafka.

The truncation lead to the Flink fetch being considered out-of-range, causing 
the auto.offset.reset logic in the Kafka consumer kicking in, leading to 
consumption from the earliest offset.

> Kafka source subtask begins to consume from earliest offset
> ---
>
> Key: FLINK-9731
> URL: https://issues.apache.org/jira/browse/FLINK-9731
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Critical
>
> On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink 
> job instance began consuming records from the earliest offsets available in 
> Kafka for the partitions assigned to it. Other subtasks did not exhibit this 
> behavior and continued operating normally.
> Previous to the event the job exhibited no Kafka lag. The job showed no 
> failed checkpoints and the job did not restore or restart. Flink logs only 
> shoed the following message:
> {noformat}
> June 30th 2018, 02:35:01.711  Fetch offset 2340400514 is out of range for 
> partition topic-124, resetting offset
> {noformat}
> The job is configured with checkpoints at 1 minute intervals. The Kafka 
> connector consumer is configured to start from group offsets if it is not 
> started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
> consumer is configured to fallback to the earliest offsets is no group 
> offsets are committed by setting `auto.offset.reset` to `earliest` in the 
> Kafka consumer config.
> Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership 
> of its partitions for around 30 seconds as a result of losing its connection 
> to ZooKeeper.
>  
> {noformat}
> [2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
> sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
> partition [cloud_ioc_events,32] to broker 
> 1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This 
> server is not the leader for that topic-partition. 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
> The broker immediately reconnected to after a few tries ZK:
> {noformat}
> [2018-06-30 09:34:55,462] INFO Opening socket connection to server 
> 10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,463] INFO Socket connection established to 
> 10.210.48.187/10.210.48.187:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,465] INFO Initiating client connection, 
> connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
> sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
> (org.apache.zookeeper.ZooKeeper)
> [2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
> session 0x161305b7bd81a09 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
> 0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
> (org.I0Itec.zkclient.ZkClient)
> [2018-06-30 09:34:55,468] INFO Opening socket connection to server 
> 10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,468] INFO Socket connection established to 
> 10.210.43.200/10.210.43.200:2181, initiating session 
> (org.apache.zookeeper.ClientCnxn)
> [2018-06-30 09:34:55,471] INFO Session establishment complete on server 
> 10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
> timeout = 6000 (org.apache.zookeeper.ClientCnxn)
> 

[jira] [Updated] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-9731:
--
Description: 
On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs only shoed the 
following message:
{noformat}
June 30th 2018, 02:35:01.711Fetch offset 2340400514 is out of range for 
partition topic-124, resetting offset
{noformat}

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this it the broker that the subtask was consuming from, as outgoing 
network traffic from it spiked after the broker recovered leadership of its 

[jira] [Created] (FLINK-9731) Kafka source subtask begins to consume from earliest offset

2018-07-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9731:
-

 Summary: Kafka source subtask begins to consume from earliest 
offset
 Key: FLINK-9731
 URL: https://issues.apache.org/jira/browse/FLINK-9731
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Elias Levy


On Jun 30th 2018, at 9:35 am UTC, the Kafka source in subtask 7 in a Flink job 
instance began consuming records from the earliest offsets available in Kafka 
for the partitions assigned to it. Other subtasks did not exhibit this behavior 
and continued operating normally.

Previous to the event the job exhibited no Kafka lag. The job showed no failed 
checkpoints and the job did not restore or restart. Flink logs show no 
indication of anything amiss. There were no errors in the or Kafka related 
messages in the Flink logs.

The job is configured with checkpoints at 1 minute intervals. The Kafka 
connector consumer is configured to start from group offsets if it is not 
started from a savepoint via `setStartFromGroupOffsets()`, and the Kafka 
consumer is configured to fallback to the earliest offsets is no group offsets 
are committed by setting `auto.offset.reset` to `earliest` in the Kafka 
consumer config.

Right before the event a Kafka broker (kafka-broker-b5-int) lost leadership of 
its partitions for around 30 seconds as a result of losing its connection to 
ZooKeeper.

 
{noformat}
[2018-06-30 09:34:54,799] INFO Unable to read additional data from server 
sessionid 0x161305b7bd81a09, likely server has closed socket, closing socket 
connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:54,899] INFO zookeeper state changed (Disconnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,384] ERROR [ReplicaFetcherThread-3-1002]: Error for 
partition [cloud_ioc_events,32] to broker 
1002:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server 
is not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
{noformat}

The broker immediately reconnected to after a few tries ZK:

{noformat}
[2018-06-30 09:34:55,462] INFO Opening socket connection to server 
10.210.48.187/10.210.48.187:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,462] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,463] INFO Socket connection established to 
10.210.48.187/10.210.48.187:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,464] WARN Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,465] INFO zookeeper state changed (Expired) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,465] INFO Initiating client connection, 
connectString=10.210.48.187:2181,10.210.43.200:2181,10.210.16.102:2181/kafka 
sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@5c33f1a9 
(org.apache.zookeeper.ZooKeeper)
[2018-06-30 09:34:55,465] INFO Unable to reconnect to ZooKeeper service, 
session 0x161305b7bd81a09 has expired, closing socket connection 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,466] INFO EventThread shut down for session: 
0x161305b7bd81a09 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,467] INFO zookeeper state changed (AuthFailed) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,468] INFO Opening socket connection to server 
10.210.43.200/10.210.43.200:2181 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,468] INFO Socket connection established to 
10.210.43.200/10.210.43.200:2181, initiating session 
(org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO Session establishment complete on server 
10.210.43.200/10.210.43.200:2181, sessionid = 0x163934fa09d1baa, negotiated 
timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2018-06-30 09:34:55,471] INFO zookeeper state changed (SyncConnected) 
(org.I0Itec.zkclient.ZkClient)
[2018-06-30 09:34:55,472] INFO re-registering broker info in ZK for broker 2005 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,472] INFO Creating /brokers/ids/2005 (is it secure? false) 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Result of znode creation is: OK 
(kafka.utils.ZKCheckedEphemeral)
[2018-06-30 09:34:55,476] INFO Registered broker 2005 at path /brokers/ids/2005 
with addresses: 
EndPoint(kafka-broker-b5-int,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(kafka-broker-b5,19092,ListenerName(PUBLIC),SASL_PLAINTEXT)
 (kafka.utils.ZkUtils)
[2018-06-30 09:34:55,476] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck$SessionExpireListener)
[2018-06-30 09:34:55,476] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck$SessionExpireListener)
{noformat}

By 9:35:02 partitions had returned to the broker.

It appears this 

[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-06-27 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16525331#comment-16525331
 ] 

Elias Levy commented on FLINK-9662:
---

I like the direction of this.  And as I mentioned in the other issue, for the 
"any" case, the system could priority slots with the most matches, or a third 
type could be introduce: "all", "any" and "most".

> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.6.0
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-06-27 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9682:
-

 Summary: Add setDescription to execution environment and display 
it in the UI
 Key: FLINK-9682
 URL: https://issues.apache.org/jira/browse/FLINK-9682
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API, Webfrontend
Affects Versions: 1.5.0
Reporter: Elias Levy


Currently you can provide a job name to {{execute}} in the execution 
environment.  In an environment where many version of a job may be executing, 
such as a development or test environment, identifying which running job is of 
a specific version via the UI can be difficult unless the version is embedded 
into the job name given the {{execute}}.  But the job name is uses for other 
purposes, such as for namespacing metrics.  Thus, it is not ideal to modify the 
job name, as that could require modifying metric dashboards and monitors each 
time versions change.

I propose a new method be added to the execution environment, 
{{setDescription}}, that would allow a user to pass in an arbitrary description 
that would be displayed in the dashboard, allowing users to distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9662) Task manager isolation for jobs

2018-06-26 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16524278#comment-16524278
 ] 

Elias Levy commented on FLINK-9662:
---

I took a look at the design doc, but I am not sufficiently familiar with the 
recent resource manager changes to comment on the details.

> Task manager isolation for jobs
> ---
>
> Key: FLINK-9662
> URL: https://issues.apache.org/jira/browse/FLINK-9662
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
> Fix For: 1.5.1
>
>
> Disable task manager sharing for different jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function

2018-06-15 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9600:
-

 Summary: Add DataStream transformation variants that pass 
timestamp to the user function
 Key: FLINK-9600
 URL: https://issues.apache.org/jira/browse/FLINK-9600
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 1.5.0
Reporter: Elias Levy


It is often necessary to access the timestamp assigned to records within user 
functions.  At the moment this is only possible from {{RichFunction}}. 
Implementing a {{RichFunction}} just to access the timestamp is burdensome, so 
most job carry a duplicate of the timestamp within the record.

It would be useful if {{DataStream}} provided transformation methods that 
accepted user functions that could be passed the record's timestamp as an 
additional argument, similar to how there are two variants of {{flatMap}}, one 
with an extra parameter that gives the user function access to the output 
{{Collector}}.

Along similar lines, it may be useful to have variants that pass the record's 
key as an additional parameter.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-06-06 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503536#comment-16503536
 ] 

Elias Levy commented on FLINK-8886:
---

I am ok with having a separate issue for per job JVM isolation, but I am 
primarily interested on the per job TM isolation via scheduling to matching 
TMs.  In practice that will give us JVM isolation without having to wait for 
anything else.

> Job isolation via scheduling in shared cluster
> --
>
> Key: FLINK-8886
> URL: https://issues.apache.org/jira/browse/FLINK-8886
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Local Runtime, Scheduler
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: Renjie Liu
>Priority: Major
>
> Flink's TaskManager executes tasks from different jobs within the same JVM as 
> threads.  We prefer to isolate different jobs on their own JVM.  Thus, we 
> must use different TMs for different jobs.  As currently the scheduler will 
> allocate task slots within a TM to tasks from different jobs, that means we 
> must stand up one cluster per job.  This is wasteful, as it requires at least 
> two JobManagers per cluster for high-availability, and the JMs have low 
> utilization.
> Additionally, different jobs may require different resources.  Some jobs are 
> compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
> the scheduler threats all TMs are equivalent, except possibly in their number 
> of available task slots.  Thus, one is required to stand up multiple cluster 
> if there is a need for different types of TMs.
> It would be useful if one could specify requirements on job, such that they 
> are only scheduled on a subset of TMs.  Properly configured, that would 
> permit isolation of jobs in a shared cluster and scheduling of jobs with 
> specific resource needs.
> One possible implementation is to specify a set of tags on the TM config file 
> which the TMs used when registering with the JM, and another set of tags 
> configured within the job or supplied when submitting the job.  The scheduler 
> could then match the tags in the job with the tags in the TMs.  In a 
> restrictive mode the scheduler would assign a job task to a TM only if all 
> tags match.  In a relaxed mode the scheduler could assign a job task to a TM 
> if there is a partial match, while giving preference to a more accurate match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9495) Implement ResourceManager for Kubernetes

2018-06-01 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9495:
-

 Summary: Implement ResourceManager for Kubernetes
 Key: FLINK-9495
 URL: https://issues.apache.org/jira/browse/FLINK-9495
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Affects Versions: 1.5.0
Reporter: Elias Levy


I noticed there is no issue for developing a Kubernetes specific 
ResourceManager under FLIP-6, so I am creating this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9450) Job hangs if S3 access it denied during checkpoints

2018-05-28 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16492837#comment-16492837
 ] 

Elias Levy commented on FLINK-9450:
---

The logs show no errors.  Alas, we are using the JRE, so these nodes don't have 
jps or jstack installed. 

Re: job failure on checkpoint failure.  My understanding is that is a 1.5 
feature (env.getCheckpointConfig.setFailTasksOnCheckpointingErrors).  We are 
running 1.4.2.

The checkpoint and the job just hang.  There is no failure, at least for the 
amount of time I've waited, which has been several minutes.  I don't know if it 
will fail if I wait longer.  The job continues after I clear the firewall rules.

Looks like the checkpoint may be hanging on the synchronous portion of the 
async checkpoint.

This is from a Flink job running on a cluster with a single TM:

I run:

{noformat}
iptables -A OUTPUT -p tcp --dport 80 -j REJECT  --reject-with tcp-reset
iptables -A OUTPUT -p tcp --dport 443 -j REJECT  --reject-with tcp-reset
{noformat}

The logs show what seems like a normal checkpoint attempt:

{noformat}
2018-05-28 16:44:06.449070500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449103500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Control Topic -> Updates 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449388500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Filter 
(1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449390500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.449400500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:44:06.451695500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend    - 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Engine (1/1),5,Flink Task Threads] took 2 ms
{noformat}

 After I disable the firewall rules:
{noformat}
iptables -D OUTPUT -p tcp --dport 80 -j REJECT --reject-with tcp-reset
iptables -D OUTPUT -p tcp --dport 443 -j REJECT  --reject-with tcp-reset
{noformat}

{noformat}
2018-05-28 16:47:19.741581500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, 
asynchronous part) in thread Thread[pool-58-thread-1,5,Flink Task Threads] took 
193290 ms.
2018-05-28 16:47:23.424434500 INFO  
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend - Heap backend 
snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Suppress Events -> Assign Timestamp -> (Sink: Kafka 
Event Topic, Sink: Kafka Event Topic) (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:47:23.424876500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent 
Queries Filter (1/1),5,Flink Task Threads] took 0 ms.
2018-05-28 16:47:23.426263500 INFO  
org.apache.flink.runtime.state.DefaultOperatorStateBackend- 
DefaultOperatorStateBackend snapshot (File Stream Factory @ 
s3://bucket/flink/job/checkpoints/03a1057d5d21dff9745eebd954c6, synchronous 
part) in thread Thread[Async calls on Source: Kafka Queries Topic -> Agent 
Queries Filter 

[jira] [Created] (FLINK-9450) Job hangs if S3 access it denied during checkpoints

2018-05-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9450:
-

 Summary: Job hangs if S3 access it denied during checkpoints
 Key: FLINK-9450
 URL: https://issues.apache.org/jira/browse/FLINK-9450
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2
Reporter: Elias Levy


We have a streaming job that consumes from and writes to Kafka.  The job is 
configured to checkpoint to S3.  If we deny access to S3 by using iptables on 
the TM host to deny all outgoing connections to ports 80 and 443, whether using 
DROP or REJECT, and whether using REJECT with -reject-with tcp-reset or -r 
reject-with imp-port-unreachable, the job soon stops publishing to Kafka.

This happens whether or not the Kafka sources have 
{{setCommitOffsetsOnCheckpoints}} set to true or false.

The system is configured to use Presto for the S3 file system.  The job has a 
small amount of state, so it is configured to use {{FsStateBackend}} with 
asynchronous snapshots.

If the ip tables rules are removed, the job continues the function.

I would expect the job to either fail or continue running if a checkpoint fails.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9440) Allow cancelation and reset of timers

2018-05-25 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9440:
-

 Summary: Allow cancelation and reset of timers
 Key: FLINK-9440
 URL: https://issues.apache.org/jira/browse/FLINK-9440
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.2
Reporter: Elias Levy


Currently the {{TimerService}} allows one to register timers, but it is not 
possible to delete a timer or to reset a timer to a new value.  If one wishes 
to reset a timer, one must also handle the previous inserted timer callbacks 
and ignore them.

I would be useful if the API allowed one to remove and reset timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9403) Documentation continues to refer to removed methods

2018-05-20 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-9403:
--
Description: 
{{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
{{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
methods.  They were removed in [this 
commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
   There are referenced in the Execution Parameters section of the Flink 
DataStream API Programming Guide page.

 

  was:{{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
{{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
methods.  They were removed in [this 
commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
 


> Documentation continues to refer to removed methods
> ---
>
> Key: FLINK-9403
> URL: https://issues.apache.org/jira/browse/FLINK-9403
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.6.0
>Reporter: Elias Levy
>Priority: Minor
>
> {{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
> {{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
> methods.  They were removed in [this 
> commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
>    There are referenced in the Execution Parameters section of the Flink 
> DataStream API Programming Guide page.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9403) Documentation continues to refer to removed methods

2018-05-20 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9403:
-

 Summary: Documentation continues to refer to removed methods
 Key: FLINK-9403
 URL: https://issues.apache.org/jira/browse/FLINK-9403
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Elias Levy


{{org.apache.flink.api.common.ExecutionConfig}} no longer has the 
{{enableTimestamps}}, {{disableTimestamps}}, and {{areTimestampsEnabled}} 
methods.  They were removed in [this 
commit|https://github.com/apache/flink/commit/ceb64248daab04b01977ff02516696e4398d656e].
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9272) DataDog API "counter" metric type is deprecated

2018-04-28 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16457827#comment-16457827
 ] 

Elias Levy commented on FLINK-9272:
---

I tried to find out before opening the issue, but I found no information, other 
than a notice in the docs saying {{counter}} was deprecated and to use 
{{count}}, and to notice that the API docs no longer list {{counter}}.

> DataDog API "counter" metric type is deprecated 
> 
>
> Key: FLINK-9272
> URL: https://issues.apache.org/jira/browse/FLINK-9272
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Elias Levy
>Priority: Major
>
> It appears to have been replaced by the "count" metric type.
> https://docs.datadoghq.com/developers/metrics/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9272) DataDog API "counter" metric type is deprecated

2018-04-28 Thread Elias Levy (JIRA)
Elias Levy created FLINK-9272:
-

 Summary: DataDog API "counter" metric type is deprecated 
 Key: FLINK-9272
 URL: https://issues.apache.org/jira/browse/FLINK-9272
 Project: Flink
  Issue Type: Improvement
  Components: Metrics
Affects Versions: 1.4.2
Reporter: Elias Levy


It appears to have been replaced by the "count" metric type.

https://docs.datadoghq.com/developers/metrics/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6756) Provide RichAsyncFunction to Scala API suite

2018-04-06 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428676#comment-16428676
 ] 

Elias Levy commented on FLINK-6756:
---

Just ran into this, any progress?

> Provide RichAsyncFunction to Scala API suite
> 
>
> Key: FLINK-6756
> URL: https://issues.apache.org/jira/browse/FLINK-6756
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Andrea Spina
>Assignee: Andrea Spina
>Priority: Major
>
> I can't find any tracking info about the chance to have RichAsyncFunction in 
> the Scala API suite. I think it'd be nice to have this function in order to 
> access open/close methods and the RuntimeContext.
> I was able to retrieve 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/There-is-no-Open-and-Close-method-in-Async-I-O-API-of-Scala-td11591.html#a11593
>  only, so my question is if there are some blocking issues avoiding this 
> feature. [~till.rohrmann]
> If it's possible and nobody already have done it, I can assign the issue to 
> myself in order to implement it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-03-19 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16405753#comment-16405753
 ] 

Elias Levy commented on FLINK-8886:
---

Thanks.  I am aware of YARN, but we do want to manage YARN just to run Flink.  
I we wanted to go down the route of using a generic resource manager, we'd use 
Mesos, but we don't have a need to do.

As I alluded to, we already run multiple Flink clusters in standalone mode to 
gain isolation, but this is usually wasteful, as the JM is usually lightly 
loaded and you need at least two of them for high-availability.  So it would be 
useful to share JMs while jobs are isolated to TMs, which is what I am 
proposing.

As for the complexity of the proposal, I'd argue that is is relatively 
lightweight in the restrictive mode.  Just permit the TM to register a set of 
tags placed in the config file, then have the scheduler only schedule jobs on 
TMs that have an exact match for the tags. 

> Job isolation via scheduling in shared cluster
> --
>
> Key: FLINK-8886
> URL: https://issues.apache.org/jira/browse/FLINK-8886
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Local Runtime, Scheduler
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Priority: Major
>
> Flink's TaskManager executes tasks from different jobs within the same JMV as 
> threads.  We prefer to isolate different jobs on their on JVM.  Thus, we must 
> use different TMs for different jobs.  As currently the scheduler will 
> allocate task slots within a TM to tasks from different jobs, that means we 
> must stand up one cluster per job.  This is wasteful, as it requires at least 
> two JobManagers per cluster for high-availability, and the JMs have low 
> utilization.
> Additionally, different jobs may require different resources.  Some jobs are 
> compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
> the scheduler threats all TMs are equivalent, except possibly in their number 
> of available task slots.  Thus, one is required to stand up multiple cluster 
> if there is a need for different types of TMs.
>  
> It would be useful if one could specify requirements on job, such that they 
> are only scheduled on a subset of TMs.  Properly configured, that would 
> permit isolation of jobs in a shared cluster and scheduling of jobs with 
> specific resource needs.
>  
> One possible implementation is to specify a set of tags on the TM config file 
> which the TMs used when registering with the JM, and another set of tags 
> configured within the job or supplied when submitting the job.  The scheduler 
> could then match the tags in the job with the tags in the TMs.  In a 
> restrictive mode the scheduler would assign a job task to a TM only if all 
> tags match.  In a relaxed mode the scheduler could assign a job task to a TM 
> if there is a partial match, while giving preference to a more accurate match.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-03-06 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8886:
-

 Summary: Job isolation via scheduling in shared cluster
 Key: FLINK-8886
 URL: https://issues.apache.org/jira/browse/FLINK-8886
 Project: Flink
  Issue Type: Improvement
  Components: Scheduler
Affects Versions: 1.5.0
Reporter: Elias Levy


Flink's TaskManager executes tasks from different jobs within the same JMV as 
threads.  We prefer to isolate different jobs on their on JVM.  Thus, we must 
use different TMs for different jobs.  As currently the scheduler will allocate 
task slots within a TM to tasks from different jobs, that means we must stand 
up one cluster per job.  This is wasteful, as it requires at least two 
JobManagers per cluster for high-availability, and the JMs have low utilization.

Additionally, different jobs may require different resources.  Some jobs are 
compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
the scheduler threats all TMs are equivalent, except possibly in their number 
of available task slots.  Thus, one is required to stand up multiple cluster if 
there is a need for different types of TMs.

 

It would be useful if one could specify requirements on job, such that they are 
only scheduled on a subset of TMs.  Properly configured, that would permit 
isolation of jobs in a shared cluster and scheduling of jobs with specific 
resource needs.

 

One possible implementation is to specify a set of tags on the TM config file 
which the TMs used when registering with the JM, and another set of tags 
configured within the job or supplied when submitting the job.  The scheduler 
could then match the tags in the job with the tags in the TMs.  In a 
restrictive mode the scheduler would assign a job task to a TM only if all tags 
match.  In a relaxed mode the scheduler could assign a job task to a TM if 
there is a partial match, while giving preference to a more accurate match.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8844) Export job jar file name or job version property via REST API

2018-03-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8844:
-

 Summary: Export job jar file name or job version property via REST 
API
 Key: FLINK-8844
 URL: https://issues.apache.org/jira/browse/FLINK-8844
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.4.3
Reporter: Elias Levy


To aid automated deployment of jobs, it would be useful if the REST API exposed 
either a running job's jar filename or a version property the job could set, 
similar to how it sets the job name.

As it is now there is no standard mechanism to determine what version of a job 
is running in a cluster.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7641) Loss of JobManager in HA mode should not cause jobs to fail

2018-02-23 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16375179#comment-16375179
 ] 

Elias Levy commented on FLINK-7641:
---

I mean that if you have a standalone cluster in HA mode with multiple JMs, if 
the current master JM fails, any jobs executing in the cluster will be stopped 
and then restored by the new master JM.  Ideally master JM failover should be 
largely invisible to running jobs.  At most, they should be temporary paused 
and continued, rather than stopped and restarted.

> Loss of JobManager in HA mode should not cause jobs to fail
> ---
>
> Key: FLINK-7641
> URL: https://issues.apache.org/jira/browse/FLINK-7641
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>
> Currently if a standalone cluster of JobManagers is configured in 
> high-availability mode and the master JM is lost, the job executing in the 
> cluster will be restarted.  This is less than ideal.  It would be best if the 
> jobs could continue to execute without restarting while one of the spare JMs 
> becomes the new master, or in the worse case, the jobs are paused while the 
> JM election takes place.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-02-23 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16374525#comment-16374525
 ] 

Elias Levy commented on FLINK-8751:
---

Apologies.  I meant TM.  It was a long day.  I've amended the issue.

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-02-23 Thread Elias Levy (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Elias Levy updated FLINK-8751:
--
Description: 
Canceling a job results in the following exception reported by the TM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 

  was:
Canceling a job results in the following exception reported by the JM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 

Component/s: (was: JobManager)
 TaskManager
Summary: Canceling a job results in a InterruptedException in the TM  
(was: Canceling a job results in a InterruptedException in the JM)

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8752) ClassNotFoundException when using the user code class loader

2018-02-22 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8752?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16373954#comment-16373954
 ] 

Elias Levy commented on FLINK-8752:
---

Some other information:  Cluster is configured in HA mode with S3 as storage 
for checkpoints and using the Presto S3 jar.  I've confirmed that the artifact 
downloaded by the blob cache in the JM matches the job's jar and that is 
contains the class in question.  Using Java 8 update 152.

> ClassNotFoundException when using the user code class loader
> 
>
> Key: FLINK-8752
> URL: https://issues.apache.org/jira/browse/FLINK-8752
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Priority: Major
>
> Attempting to submit a job results in the job failing while it is being 
> started in the JMs with a ClassNotFoundException error: 
> {code:java}
> java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
>   at java.net.URLClassLoader.findClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
>   at java.lang.ClassLoader.loadClass(Unknown Source)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
>   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
>   at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> {code}
> If I drop the job's jar into the lib folder in the JM and configure the JM to 
>  classloader.resolve-order to parent-first the job starts successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8752) ClassNotFoundException when using the user code class loader

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8752:
-

 Summary: ClassNotFoundException when using the user code class 
loader
 Key: FLINK-8752
 URL: https://issues.apache.org/jira/browse/FLINK-8752
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Attempting to submit a job results in the job failing while it is being started 
in the JMs with a ClassNotFoundException error: 
{code:java}
java.lang.ClassNotFoundException: com.foo.flink.common.util.TimeAssigner
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.io.ObjectInputStream.readObject0(Unknown Source)
at java.io.ObjectInputStream.readObject(Unknown Source)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:393)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:380)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:368)
at 
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.createPartitionStateHolders(AbstractFetcher.java:542)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.(AbstractFetcher.java:167)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.(Kafka09Fetcher.java:89)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.(Kafka010Fetcher.java:62)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010.createFetcher(FlinkKafkaConsumer010.java:203)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:564)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Unknown Source)
{code}

If I drop the job's jar into the lib folder in the JM and configure the JM to  
classloader.resolve-order to parent-first the job starts successfully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8751) Canceling a job results in a InterruptedException in the JM

2018-02-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8751:
-

 Summary: Canceling a job results in a InterruptedException in the 
JM
 Key: FLINK-8751
 URL: https://issues.apache.org/jira/browse/FLINK-8751
 Project: Flink
  Issue Type: Bug
  Components: JobManager
Affects Versions: 1.4.1
Reporter: Elias Levy


Canceling a job results in the following exception reported by the JM: 
{code:java}
ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could not 
shut down timer service java.lang.InterruptedException 
  at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
 Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
Source) 
  at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
 
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
  at java.lang.Thread.run(Unknown Source){code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-11 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322789#comment-16322789
 ] 

Elias Levy commented on FLINK-7935:
---

So it seems the DD reporter needs to switch from 
{{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} to 
{{MetricGroup#getMetricIdentifier(String)}}.  That would be sufficient for my 
immediate use case, as I am only looking to add a single user supplied 
scope/tag.

That said, I can see {{AbstractMetricGroup#getLogicalScope(CharacterFilter)}} 
becoming cumbersome if a user wishes to use multiple key-values/tags.  E.g. 

{code}
getRuntimeContext()
  .getMetricGroup()
  .addGroup("messages")
  .addGroup("type", messageType)
  .addGroup("source", messageSource)
  .addGroup("priority", messagePriority)
  .counter("count")
{code}

would be named {{.messages.type.source.priority.count}} 
instead of just {{.messages.count}} with variables/tags 
{{type}}, {{source}}, and {{priority}}.

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5627) Allow job specific externalized checkpoint dir

2018-01-10 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321239#comment-16321239
 ] 

Elias Levy commented on FLINK-5627:
---

It would be nice to fix this issue.

> Allow job specific externalized checkpoint dir
> --
>
> Key: FLINK-5627
> URL: https://issues.apache.org/jira/browse/FLINK-5627
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.3.0
>Reporter: Gyula Fora
>
> Currently the externalized checkpoint directory can only be configured on a 
> cluster level. This is not in sync with the way checkpoint directories are 
> generally configured on a job specific level.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-05 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314122#comment-16314122
 ] 

Elias Levy commented on FLINK-7935:
---

 [~Zentol]  thoughts?

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8358) Hostname used by DataDog metric reporter is not configurable

2018-01-03 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8358:
-

 Summary: Hostname used by DataDog metric reporter is not 
configurable
 Key: FLINK-8358
 URL: https://issues.apache.org/jira/browse/FLINK-8358
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.4.0
Reporter: Elias Levy


The hostname used by the DataDog metric reporter to report metrics is not 
configurable.  This can problematic if the hostname that Flink uses is 
different from the hostname used by the system's DataDog agent.  

For instance, in our environment we use Chef, and using the DataDog Chef 
Handler, certain metadata such a host roles is associated with the hostname in 
the DataDog service.  The hostname used to submit this metadata is the name we 
have given the host.  But as Flink picks up the default name given by EC2 to 
the instance, metrics submitted by Flink to DataDog using that hostname are not 
associated with the tags derived from Chef.

In the Job Manager we can avoid this issue by explicitly setting the config 
{{jobmanager.rpc.address}} to the hostname we desire.  I attempted to do the 
name on the Task Manager by setting the {{taskmanager.hostname}} config, but 
DataDog does not seem to pick up that value.

Digging through the code it seem the DD metric reporter get the hostname from 
the {{TaskManagerMetricGroup}} host variable, which seems to be set from 
{{taskManagerLocation.getHostname}}.  That in turn seems to be by calling 
{{this.inetAddress.getCanonicalHostName()}}, which merely perform a reverse 
lookup on the IP address, and then calling {{NetUtils.getHostnameFromFQDN}} on 
the result.  The later is further problematic because it result is a non-fully 
qualified hostname.

More generally, there seems to be a need to specify the hostname of a JM or TM 
node that be reused across Flink components.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2018-01-03 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16310189#comment-16310189
 ] 

Elias Levy commented on FLINK-7935:
---

It appears the Metric documentation has not been updated in the 1.5 snapshot, 
but if I understand the changes it means that for instance, if I have a job 
that processes a bounded but not predefined set of message types, and publish a 
metric per message type that counts the number of process messages per type, I 
could do:

{code}
getRuntimeContext()
  .getMetricGroup()
  .addGroup("messages")
  .addGroup("type", messageType)
  .counter("count")
{code}

And the DataDog reporter would report a metric named {{messages.counts}} with 
tags of {{type:messageType}}.  If that is correct, then it may be sufficient.

Does FLINK-7692 work as I described?

> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8352) Flink UI Reports No Error on Job Submission Failures

2018-01-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8352:
-

 Summary: Flink UI Reports No Error on Job Submission Failures
 Key: FLINK-8352
 URL: https://issues.apache.org/jira/browse/FLINK-8352
 Project: Flink
  Issue Type: Bug
  Components: Web Client
Affects Versions: 1.4.0
Reporter: Elias Levy


If you submit a job jar via the web UI and it raises an exception when started, 
the UI will report no error and will continue the show the animated image that 
makes it seem as if it is working.  In addition, no error is printed in the 
logs, unless the level is increased to at least DEBUG:

{noformat}
@40005a4c399202b87ebc DEBUG 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler - Error while 
handling request.
@40005a4c399202b8868c java.util.concurrent.CompletionException: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b88a74   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:68)
@40005a4c399202b88e5c   at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
@40005a4c399202b8e44c   at 
java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
@40005a4c399202b8e44c   at java.util.concurrent.FutureTask.run(Unknown 
Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown
 Source)
@40005a4c399202b8e834   at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source)
@40005a4c399202b8f3ec   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
@40005a4c399202b8f7d4   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
@40005a4c399202b8f7d4   at java.lang.Thread.run(Unknown Source)
@40005a4c399202b8fbbc Caused by: 
org.apache.flink.client.program.ProgramInvocationException: The program caused 
an error: 
@40005a4c399202b90b5c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
@40005a4c399202b90f44   at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
@40005a4c399202b90f44   at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
@40005a4c399202b91afc   at 
org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler.lambda$handleJsonRequest$0(JarPlanHandler.java:57)
@40005a4c399202b91afc   ... 8 more
@40005a4c399202b91ee4 Caused by: java.lang.ExceptionInInitializerError
@40005a4c399202b91ee4   at 
com.cisco.sbg.amp.flink.ioc_engine.IocEngine.main(IocEngine.scala)
@40005a4c399202b922cc   at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
@40005a4c399202b92a9c   at 
sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92a9c   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
@40005a4c399202b92e84   at java.lang.reflect.Method.invoke(Unknown 
Source)
@40005a4c399202b92e84   at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
@40005a4c399202b9326c   at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
@40005a4c399202b93a3c   at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
@40005a4c399202b949dc   ... 11 more
@40005a4c399202b949dc Caused by: java.io.FileNotFoundException: 
/data/jenkins/jobs/XXX/workspace/target/scala-2.11/scoverage-data/scoverage.measurements.55
 (No such file or directory)
@40005a4c399202b951ac   at java.io.FileOutputStream.open0(Native Method)
@40005a4c399202b951ac   at java.io.FileOutputStream.open(Unknown Source)
@40005a4c399202b9597c   at java.io.FileOutputStream.(Unknown 
Source)
@40005a4c399202b9597c   at java.io.FileWriter.(Unknown Source)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b95d64   at 
scoverage.Invoker$$anonfun$1.apply(Invoker.scala:42)
@40005a4c399202b9614c   at 
scala.collection.concurrent.TrieMap.getOrElseUpdate(TrieMap.scala:901)
@40005a4c399202b9614c   at scoverage.Invoker$.invoked(Invoker.scala:42)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala:28)
@40005a4c399202b9691c   at com.XXX$.(IocEngine.scala)
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8311) Flink needs documentation for network access control

2017-12-22 Thread Elias Levy (JIRA)
Elias Levy created FLINK-8311:
-

 Summary: Flink needs documentation for network access control
 Key: FLINK-8311
 URL: https://issues.apache.org/jira/browse/FLINK-8311
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Elias Levy


There is a need for better documentation on what connects to what over which 
ports in a Flink cluster to allow users to configure network access control 
rules.

E.g. I was under the impression that in a ZK HA configuration the Job Managers 
were essentially independent and only coordinated via ZK.  But starting 
multiple JMs in HA with the JM RPC port blocked between JMs shows that the 
second JM's Akka subsystem is trying to connect to the leading JM:

INFO  akka.remote.transport.ProtocolStateActor  - No 
response from remote for outbound association. Associate timed out after [2 
ms].
WARN  akka.remote.ReliableDeliverySupervisor- 
Association with remote system [akka.tcp://flink@10.210.210.127:6123] has 
failed, address is now gated for [5000] ms. Reason: [Association failed with 
[akka.tcp://flink@10.210.210.127:6123]] Caused by: [No response from remote for 
outbound association. Associate timed out after [2 ms].]
WARN  akka.remote.transport.netty.NettyTransport- Remote 
connection to [null] failed with 
org.apache.flink.shaded.akka.org.jboss.netty.channel.ConnectTimeoutException: 
connection timed out: /10.210.210.127:6123



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5880) Add documentation for object reuse for DataStream API

2017-12-19 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16297798#comment-16297798
 ] 

Elias Levy commented on FLINK-5880:
---

Came across here to open this issue after reading the latest [blog 
post|https://data-artisans.com/blog/curious-case-broken-benchmark-revisiting-apache-flink-vs-databricks-runtime].
  Hard to fault Databricks if the documentation about object reuse is not there.

> Add documentation for object reuse for DataStream API
> -
>
> Key: FLINK-5880
> URL: https://issues.apache.org/jira/browse/FLINK-5880
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Aljoscha Krettek
>
> The batch documentation has this section: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#operating-on-data-objects-in-functions



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2017-11-02 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236751#comment-16236751
 ] 

Elias Levy commented on FLINK-6243:
---

[~StephanEwen] anything to review?

> Continuous Joins:  True Sliding Window Joins
> 
>
> Key: FLINK-6243
> URL: https://issues.apache.org/jira/browse/FLINK-6243
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Flink defines sliding window joins as the join of elements of two streams 
> that share a window of time, where the windows are defined by advancing them 
> forward some amount of time that is less than the window time span.  More 
> generally, such windows are just overlapping hopping windows. 
> Other systems, such as Kafka Streams, support a different notion of sliding 
> window joins.  In these systems, two elements of a stream are joined if the 
> absolute time difference between the them is less or equal the time window 
> length.
> This alternate notion of sliding window joins has some advantages in some 
> applications over the current implementation.  
> Elements to be joined may both fall within multiple overlapping sliding 
> windows, leading them to be joined multiple times, when we only wish them to 
> be joined once.
> The implementation need not instantiate window objects to keep track of 
> stream elements, which becomes problematic in the current implementation if 
> the window size is very large and the slide is very small.
> It allows for asymmetric time joins.  E.g. join if elements from stream A are 
> no more than X time behind and Y time head of an element from stream B.
> It is currently possible to implement a join with these semantics using 
> {{CoProcessFunction}}, but the capability should be a first class feature, 
> such as it is in Kafka Streams.
> To perform the join, elements of each stream must be buffered for at least 
> the window time length.  To allow for large window sizes and high volume of 
> elements, the state, possibly optionally, should be buffered such as it can 
> spill to disk (e.g. by using RocksDB).
> The same stream may be joined multiple times in a complex topology.  As an 
> optimization, it may be wise to reuse any element buffer among colocated join 
> operators.  Otherwise, there may write amplification and increased state that 
> must be snapshotted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables

2017-10-27 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222972#comment-16222972
 ] 

Elias Levy commented on FLINK-7935:
---

Possibly.  It depends on whether you could add multiple metrics or metric 
groups that differ in their variables, but that could be formatted the same.  
E.g. the TaskManagerJobMetricGroup creates and tracks distinct TaskMetricGroup 
for each task in a the portion of a job the task manager is executing.  The 
metrics for each task are tracked separately, but I can format the scope so all 
of them are reported with the same name ("taskmanager.job.task") but with 
different variables/DD tags.



> Metrics with user supplied scope variables
> --
>
> Key: FLINK-7935
> URL: https://issues.apache.org/jira/browse/FLINK-7935
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.3.2
>Reporter: Elias Levy
>
> We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
> metrics.
> Flink names and scopes metrics together, at least by default. E.g. by default 
>  the System scope for operator metrics is 
> {{.taskmanager}}.  
> The scope variables become part of the metric's full name.
> In DD the metric would be named something generic, e.g. 
> {{taskmanager.job.operator}}, and they would be distinguished by their tag 
> values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.
> Flink allows you to configure the format string for system scopes, so it is 
> possible to set the operator scope format to {{taskmanager.job.operator}}.  
> We do this for all scopes:
> {code}
> metrics.scope.jm: jobmanager
> metrics.scope.jm.job: jobmanager.job
> metrics.scope.tm: taskmanager
> metrics.scope.tm.job: taskmanager.job
> metrics.scope.task: taskmanager.job.task
> metrics.scope.operator: taskmanager.job.operator
> {code}
> This seems to work.  The DataDog Flink metric's plugin submits all scope 
> variables as tags, even if they are not used within the scope format.  And it 
> appears internally this does not lead to metrics conflicting with each other.
> We would like to extend this to user defined metrics, but you can define 
> variables/scopes when adding a metric group or metric with the user API, so 
> that in DD we have a single metric with a tag with many different values, 
> rather than hundreds of metrics to just the one value we want to measure 
> across different event types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7935) Metrics with user supplied scope variables

2017-10-26 Thread Elias Levy (JIRA)
Elias Levy created FLINK-7935:
-

 Summary: Metrics with user supplied scope variables
 Key: FLINK-7935
 URL: https://issues.apache.org/jira/browse/FLINK-7935
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.3.2
Reporter: Elias Levy


We use DataDog for metrics.  DD and Flink differ somewhat in how they track 
metrics.

Flink names and scopes metrics together, at least by default. E.g. by default  
the System scope for operator metrics is 
{{.taskmanager}}.  The 
scope variables become part of the metric's full name.

In DD the metric would be named something generic, e.g. 
{{taskmanager.job.operator}}, and they would be distinguished by their tag 
values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}.

Flink allows you to configure the format string for system scopes, so it is 
possible to set the operator scope format to {{taskmanager.job.operator}}.  We 
do this for all scopes:

{code}
metrics.scope.jm: jobmanager
metrics.scope.jm.job: jobmanager.job
metrics.scope.tm: taskmanager
metrics.scope.tm.job: taskmanager.job
metrics.scope.task: taskmanager.job.task
metrics.scope.operator: taskmanager.job.operator
{code}

This seems to work.  The DataDog Flink metric's plugin submits all scope 
variables as tags, even if they are not used within the scope format.  And it 
appears internally this does not lead to metrics conflicting with each other.

We would like to extend this to user defined metrics, but you can define 
variables/scopes when adding a metric group or metric with the user API, so 
that in DD we have a single metric with a tag with many different values, 
rather than hundreds of metrics to just the one value we want to measure across 
different event types.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   >