[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks

2019-07-20 Thread Shannon Carey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889581#comment-16889581
 ] 

Shannon Carey commented on FLINK-12595:
---

Created [https://github.com/apache/flink/pull/9187]

> KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown
>  deadlocks
> --
>
> Key: FLINK-12595
> URL: https://issues.apache.org/jira/browse/FLINK-12595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.9.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> https://api.travis-ci.org/v3/job/535738122/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks

2019-07-20 Thread Shannon Carey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889564#comment-16889564
 ] 

Shannon Carey edited comment on FLINK-12595 at 7/20/19 6:49 PM:


Sorry about that! Had to draw out a big sequence diagram, but I have a 
hypothesis about the issue. It looks like KinesisDataFetcher (on the 
FlinkKinesisConsumer thread) could by chance pause inside the while(running) 
loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might 
interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then 
trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to 
complete, after which the test would also interrupt the FlinkKinesisConsumer 
thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() 
and catches/ignores the interrupted state (caused by both interrupts), then it 
exits the while(running) loop due to running == false, and gets to our 
awaitTermination() code which waits forever because it has already absorbed the 
test's interrupt.

This can be reproduced by adding code like this to the KinesisDataFetcher 
beneath the if(running && discoveryIntervalMillis !=0) line in order to force a 
longer delay in that thread:
{code:java}
boolean wasInterrupted = false;
int interruptionCount = 0;
for (int i = 0; i < 4; i++) {
   try {
  Thread.sleep(4000);
   } catch (InterruptedException ie) {
  wasInterrupted = true;
  interruptionCount++;
   }
}
if (wasInterrupted) {
   // Restore the interrupted state
   Thread.currentThread().interrupt();
}
System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + 
" times during the " +
   "while(running) loop.");
System.out.flush();
{code}
You'll likely see that it gets interrupted twice, and the test deadlocks with 
the same stacks as the logs provided above.

I have 02a0cf3d4e checked out to reproduce the issue matching the provided 
logs. I assume it's best to write this patch against HEAD of master, and let 
you handle backporting it? Let me know if that's not the case. I'll post again 
once I have a PR to address my hypothesis.


was (Author: rehevkor5):
Sorry about that! Had to draw out a big sequence diagram, but I have a 
hypothesis about the issue. It looks like KinesisDataFetcher (on the 
FlinkKinesisConsumer thread) could by chance pause inside the while(running) 
loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might 
interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then 
trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to 
complete, after which the test would also interrupt the FlinkKinesisConsumer 
thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() 
and catches/ignores the interrupted state (caused by both interrupts), then it 
exits the while(running) loop due to running == false, and gets to our 
awaitTermination() code which waits forever because it has already absorbed the 
test's interrupt.

This can be reproduced by adding code like this to the KinesisDataFetcher 
beneath the if(running && discoveryIntervalMillis !=0) line in order to force a 
longer delay in that thread:

 
{code:java}
boolean wasInterrupted = false;
int interruptionCount = 0;
for (int i = 0; i < 4; i++) {
   try {
  Thread.sleep(4000);
   } catch (InterruptedException ie) {
  wasInterrupted = true;
  interruptionCount++;
   }
}
if (wasInterrupted) {
   // Restore the interrupted state
   Thread.currentThread().interrupt();
}
System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + 
" times during the " +
   "while(running) loop.");
System.out.flush();
{code}
You'll likely see that it gets interrupted twice, and the test deadlocks with 
the same stacks as the logs provided above.

 

I have 02a0cf3d4e checked out to reproduce the issue matching the provided 
logs. I assume it's best to write this patch against HEAD of master, and let 
you handle backporting it? Let me know if that's not the case. I'll post again 
once I have a PR to address my hypothesis.

> KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown
>  deadlocks
> --
>
> Key: FLINK-12595
> URL: https://issues.apache.org/jira/browse/FLINK-12595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/535738122/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks

2019-07-20 Thread Shannon Carey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889565#comment-16889565
 ] 

Shannon Carey commented on FLINK-12595:
---

Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not 
called if fetcher.runFetcher() throws an exception. This seems like it might be 
a problem? Should that be moved into a "finally" block? Do you think I should 
submit a separate issue for that??

> KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown
>  deadlocks
> --
>
> Key: FLINK-12595
> URL: https://issues.apache.org/jira/browse/FLINK-12595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/535738122/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Comment Edited] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks

2019-07-20 Thread Shannon Carey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889565#comment-16889565
 ] 

Shannon Carey edited comment on FLINK-12595 at 7/20/19 6:26 PM:


Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not 
called if fetcher.runFetcher() throws an exception. This seems like it might be 
a problem? Should that be moved into a "finally" block? Do you think I should 
submit a separate issue for that?


was (Author: rehevkor5):
Another thing I noticed: FlinkKinesisConsumer#sourceContext.close() is not 
called if fetcher.runFetcher() throws an exception. This seems like it might be 
a problem? Should that be moved into a "finally" block? Do you think I should 
submit a separate issue for that??

> KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown
>  deadlocks
> --
>
> Key: FLINK-12595
> URL: https://issues.apache.org/jira/browse/FLINK-12595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/535738122/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (FLINK-12595) KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown deadlocks

2019-07-20 Thread Shannon Carey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16889564#comment-16889564
 ] 

Shannon Carey commented on FLINK-12595:
---

Sorry about that! Had to draw out a big sequence diagram, but I have a 
hypothesis about the issue. It looks like KinesisDataFetcher (on the 
FlinkKinesisConsumer thread) could by chance pause inside the while(running) 
loop, before Thread.sleep(). Then, the KinesisShardConsumer thread might 
interrupt that thread via KinesisDataFetcher#mainThread.interrupt() and then 
trigger shutdownWaiter, allowing the tests's fetcher.waitUntilShutdown() to 
complete, after which the test would also interrupt the FlinkKinesisConsumer 
thread. Then, when the KinesisDataFetcher code resumes, it calls Thread.sleep() 
and catches/ignores the interrupted state (caused by both interrupts), then it 
exits the while(running) loop due to running == false, and gets to our 
awaitTermination() code which waits forever because it has already absorbed the 
test's interrupt.

This can be reproduced by adding code like this to the KinesisDataFetcher 
beneath the if(running && discoveryIntervalMillis !=0) line in order to force a 
longer delay in that thread:

 
{code:java}
boolean wasInterrupted = false;
int interruptionCount = 0;
for (int i = 0; i < 4; i++) {
   try {
  Thread.sleep(4000);
   } catch (InterruptedException ie) {
  wasInterrupted = true;
  interruptionCount++;
   }
}
if (wasInterrupted) {
   // Restore the interrupted state
   Thread.currentThread().interrupt();
}
System.out.println("KinesisDataFetcher was interrupted " + interruptionCount + 
" times during the " +
   "while(running) loop.");
System.out.flush();
{code}
You'll likely see that it gets interrupted twice, and the test deadlocks with 
the same stacks as the logs provided above.

 

I have 02a0cf3d4e checked out to reproduce the issue matching the provided 
logs. I assume it's best to write this patch against HEAD of master, and let 
you handle backporting it? Let me know if that's not the case. I'll post again 
once I have a PR to address my hypothesis.

> KinesisDataFetcherTest.testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown
>  deadlocks
> --
>
> Key: FLINK-12595
> URL: https://issues.apache.org/jira/browse/FLINK-12595
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis, Tests
>Affects Versions: 1.9.0
>Reporter: Dawid Wysakowicz
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.9.0
>
>
> https://api.travis-ci.org/v3/job/535738122/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (FLINK-11568) Exception in Kinesis ShardConsumer hidden by InterruptedException

2019-02-08 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-11568:
-

 Summary: Exception in Kinesis ShardConsumer hidden by 
InterruptedException 
 Key: FLINK-11568
 URL: https://issues.apache.org/jira/browse/FLINK-11568
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Affects Versions: 1.6.2
Reporter: Shannon Carey
Assignee: Shannon Carey


When the Kinesis ShardConsumer encounters an exception, for example due to a 
problem in the Deserializer, the root cause exception is often hidden by a 
non-informative InterruptedException caused by the FlinkKinesisConsumer thread 
being interrupted.

Ideally, the root cause exception would be preserved and thrown so that the 
logs contain enough information to diagnose the issue.

This probably affects all versions.

Here's an example of a log message with the unhelpful InterruptedException:
{code:java}
2019-02-05 13:29:31:383 thread=Source: Custom Source -> Filter -> Map -> Sink: 
Unnamed (1/8), level=WARN, 
logger=org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer, 
message="Error while closing Kinesis data fetcher"
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.awaitTermination(KinesisDataFetcher.java:450)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.cancel(FlinkKinesisConsumer.java:314)
at 
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:323)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
{code}
And here's an example of the real exception that we're actually interested in, 
which is stored inside KinesisDataFetcher#error, but is not thrown or logged:
{code:java}
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416)
org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240)
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135)
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper.deserialize(KinesisDeserializationSchemaWrapper.java:44)
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:332)
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:231)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
java.util.concurrent.FutureTask.run(FutureTask.java)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink

2017-06-01 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-6805:
-
Description: 
The Flink Cassandra connector has a dependency on Netty libraries (via 
promotion of transitive dependencies by the Maven shade plugin) at version 
4.0.33.Final, which disagrees with the version included in Flink of 
4.0.27.Final which is included & managed by the parent POM via dependency on 
netty-all.

Due to use of netty-all, the dependency management doesn't take effect on the 
individual libraries such as netty-handler, netty-codec, etc.

I suggest that dependency management of Netty should be added for all Netty 
libraries individually (netty-handler, etc.) so that all Flink modules use the 
same version, and similarly I suggest that exclusions be added to the 
quickstart example POM for the individual Netty libraries so that fat JARs 
don't include conflicting versions of Netty.

It seems like this problem started when FLINK-6084 was implemented: transitive 
dependencies of the flink-connector-cassandra were previously omitted, and now 
that they are included we must make sure that they agree with the Flink 
distribution.

  was:
The Flink Cassandra connector has a dependency on Netty libraries (via 
promotion of transitive dependencies by the Maven shade plugin) at version 
4.0.33.Final, which disagrees with the version included in Flink of 
4.0.27.Final which is included & managed by the parent POM via dependency on 
netty-all.

Due to use of netty-all, the dependency management doesn't take effect on the 
individual libraries such as netty-handler, netty-codec, etc.

I suggest that dependency management of Netty should be added for all Netty 
libraries individually (netty-handler, etc.) so that all Flink modules use the 
same version, and similarly I suggest that exclusions be added to the 
quickstart example POM for the individual Netty libraries so that fat JARs 
don't include conflicting versions of Netty.


> Flink Cassandra connector dependency on Netty disagrees with Flink
> --
>
> Key: FLINK-6805
> URL: https://issues.apache.org/jira/browse/FLINK-6805
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Shannon Carey
>
> The Flink Cassandra connector has a dependency on Netty libraries (via 
> promotion of transitive dependencies by the Maven shade plugin) at version 
> 4.0.33.Final, which disagrees with the version included in Flink of 
> 4.0.27.Final which is included & managed by the parent POM via dependency on 
> netty-all.
> Due to use of netty-all, the dependency management doesn't take effect on the 
> individual libraries such as netty-handler, netty-codec, etc.
> I suggest that dependency management of Netty should be added for all Netty 
> libraries individually (netty-handler, etc.) so that all Flink modules use 
> the same version, and similarly I suggest that exclusions be added to the 
> quickstart example POM for the individual Netty libraries so that fat JARs 
> don't include conflicting versions of Netty.
> It seems like this problem started when FLINK-6084 was implemented: 
> transitive dependencies of the flink-connector-cassandra were previously 
> omitted, and now that they are included we must make sure that they agree 
> with the Flink distribution.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink

2017-06-01 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16033571#comment-16033571
 ] 

Shannon Carey commented on FLINK-6805:
--

A better solution may be to add exclusions for the Netty libraries in the 
flink-connector-cassandra POM, then the other changes wouldn't necessarily be 
required (although they might still be desirable).

> Flink Cassandra connector dependency on Netty disagrees with Flink
> --
>
> Key: FLINK-6805
> URL: https://issues.apache.org/jira/browse/FLINK-6805
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.3.0, 1.2.1
>Reporter: Shannon Carey
>
> The Flink Cassandra connector has a dependency on Netty libraries (via 
> promotion of transitive dependencies by the Maven shade plugin) at version 
> 4.0.33.Final, which disagrees with the version included in Flink of 
> 4.0.27.Final which is included & managed by the parent POM via dependency on 
> netty-all.
> Due to use of netty-all, the dependency management doesn't take effect on the 
> individual libraries such as netty-handler, netty-codec, etc.
> I suggest that dependency management of Netty should be added for all Netty 
> libraries individually (netty-handler, etc.) so that all Flink modules use 
> the same version, and similarly I suggest that exclusions be added to the 
> quickstart example POM for the individual Netty libraries so that fat JARs 
> don't include conflicting versions of Netty.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-6805) Flink Cassandra connector dependency on Netty disagrees with Flink

2017-06-01 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-6805:


 Summary: Flink Cassandra connector dependency on Netty disagrees 
with Flink
 Key: FLINK-6805
 URL: https://issues.apache.org/jira/browse/FLINK-6805
 Project: Flink
  Issue Type: Bug
  Components: Cassandra Connector
Affects Versions: 1.2.1, 1.3.0
Reporter: Shannon Carey


The Flink Cassandra connector has a dependency on Netty libraries (via 
promotion of transitive dependencies by the Maven shade plugin) at version 
4.0.33.Final, which disagrees with the version included in Flink of 
4.0.27.Final which is included & managed by the parent POM via dependency on 
netty-all.

Due to use of netty-all, the dependency management doesn't take effect on the 
individual libraries such as netty-handler, netty-codec, etc.

I suggest that dependency management of Netty should be added for all Netty 
libraries individually (netty-handler, etc.) so that all Flink modules use the 
same version, and similarly I suggest that exclusions be added to the 
quickstart example POM for the individual Netty libraries so that fat JARs 
don't include conflicting versions of Netty.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5690) protobuf is not shaded properly

2017-05-08 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16001826#comment-16001826
 ] 

Shannon Carey commented on FLINK-5690:
--

Regarding the comment:

{quote}
Mh, that's weird. According to this answer 
(http://stackoverflow.com/questions/7076414/java-lang-illegalaccesserror-tried-to-access-method)
 the IllegalAccessError is caused by compile time / runtime version mismatches, 
not by different classloaders.
{quote}

You might get IllegalAccessError if the class or method is eg. protected or 
default scope (not public or private) and the class trying to access it is in 
the same package but from a different classloader. We're having this problem 
when we try to use a more recent version of Typesafe Config, and I'm not sure 
why it's not using the user code classloader provided via 
Thread#setContextClassLoader().

> protobuf is not shaded properly
> ---
>
> Key: FLINK-5690
> URL: https://issues.apache.org/jira/browse/FLINK-5690
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.4, 1.3.0
>Reporter: Andrey
>Assignee: Robert Metzger
>
> Currently distributive contains com/google/protobuf package. Without proper 
> shading client code could fail with:
> {code}
> Caused by: java.lang.IllegalAccessError: tried to access method 
> com.google.protobuf.
> {code}
> Steps to reproduce:
> * create job class "com.google.protobuf.TestClass"
> * call com.google.protobuf.TextFormat.escapeText(String) method from this 
> class
> * deploy job to flink cluster (usign web console for example)
> * run job. In logs IllegalAccessError.
> Issue in package protected method and different classloaders. TestClass 
> loaded by FlinkUserCodeClassLoader, but TextFormat class loaded by 
> sun.misc.Launcher$AppClassLoader



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-28 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15889297#comment-15889297
 ] 

Shannon Carey commented on FLINK-5929:
--

[~aljoscha] as far as I am aware, the state does get cleared out by our 
Trigger. In Trigger#clear() we have: 
ctx.getPartitionedState(fireTimestampStateDescriptor).clear(); We could have 
done it in the Window Function instead, if we wanted to, given our hack.

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886419#comment-15886419
 ] 

Shannon Carey commented on FLINK-5929:
--

If I understand correctly, I agree this would be useful. Currently we are 
working around this limitation in order to achieve communication between the 
Trigger (per-pane state) and the WindowFunction (per-operator state) by a hack 
within the WindowFunction that looks like this (we're not on 1.2 yet so we 
haven't looked at new ways to do this yet):

{code}
  def apply(key: String, window: TimeWindow, input, out): Unit = {
val fireTimestampState: ValueState[java.lang.Long] =
  getRuntimeContext.getState[java.lang.Long](fireTimestampStateDescriptor)

if (fireTimestampState.isInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[MemValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[RocksDBValueState[String, 
TimeWindow, java.lang.Long]]) {
  fireTimestampState.asInstanceOf[RocksDBValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
} else if (fireTimestampState.isInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]]) {
  fireTimestampState.asInstanceOf[FsValueState[String, TimeWindow, 
java.lang.Long]].setCurrentNamespace(window)
}
fireTimestampState.value()
...
{code}

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:40 PM:
---

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator (that's what you mean 
by "global"). What you're suggesting is adding state for the individual window 
panes, right?


was (Author: rehevkor5):
Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual window panes, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey edited comment on FLINK-5929 at 2/27/17 7:38 PM:
---

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual window panes, right?


was (Author: rehevkor5):
Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual panes within the window, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5929) Allow Access to Per-Window State in ProcessWindowFunction

2017-02-27 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15886378#comment-15886378
 ] 

Shannon Carey commented on FLINK-5929:
--

Just to clarify, you're referring to the state from 
AbstractRichFunction#getRuntimeContext.getState(), right? That's the state 
keyed by the combination of event key and window operator. What you're 
suggesting is adding state for the individual panes within the window, right?

> Allow Access to Per-Window State in ProcessWindowFunction
> -
>
> Key: FLINK-5929
> URL: https://issues.apache.org/jira/browse/FLINK-5929
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>
> Right now, the state that a {{WindowFunction}} or {{ProcessWindowFunction}} 
> can access is scoped to the key of the window but not the window itself. That 
> is, state is global across all windows for a given key.
> For some use cases it is beneficial to keep state scoped to a window. For 
> example, if you expect to have several {{Trigger}} firings (due to early and 
> late firings) a user can keep state per window to keep some information 
> between those firings.
> The per-window state has to be cleaned up in some way. For this I see two 
> options:
>  - Keep track of all state that a user uses and clean up when we reach the 
> window GC horizon.
>  - Add a method {{cleanup()}} to {{ProcessWindowFunction}} which is called 
> when we reach the window GC horizon that users can/should use to clean up 
> their state.
> On the API side, we can add a method {{windowState()}} on 
> {{ProcessWindowFunction.Context}} that retrieves the per-window state and 
> {{globalState()}} that would allow access to the (already available) global 
> state. The {{Context}} would then look like this:
> {code}
> /**
>  * The context holding window metadata
>  */
> public abstract class Context {
> /**
>  * @return The window that is being evaluated.
>  */
> public abstract W window();
> /**
>  * State accessor for per-key and per-window state.
>  */
> KeyedStateStore windowState();
> /**
>  * State accessor for per-key global state.
>  */
> KeyedStateStore globalState();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (FLINK-5608) Cancel button not always visible

2017-01-22 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5608:


 Summary: Cancel button not always visible
 Key: FLINK-5608
 URL: https://issues.apache.org/jira/browse/FLINK-5608
 Project: Flink
  Issue Type: Bug
  Components: Webfrontend
Affects Versions: 1.1.4
Reporter: Shannon Carey
Assignee: Shannon Carey
Priority: Minor


When the window is not wide enough, or when the job name is too long, the 
"Cancel" button in the Job view of the web UI is not visible because it is the 
first element that gets wrapped down and gets covered by the secondary navbar 
(the tabs). This causes us to often need to resize the browser wider than our 
monitor in order to use the cancel button.

In general, the use of Bootstrap's ".navbar-fixed-top" is problematic if the 
content may wrap, especially if the content's horizontal width if not known & 
fixed. The ".navbar-fixed-top" uses fixed positioning, and therefore any 
unexpected change in height will result in overlap with the rest of the 
normal-flow content in the page. The Bootstrap docs explain this in their 
"Overflowing content" callout.

I am submitting a PR which does not attempt to resolve all issues with the 
fixed navbar approach, but attempts to improve the situation by using less 
horizontal space and by altering the layout approach of the Cancel button.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5542) YARN client incorrectly uses local YARN config to check vcore capacity

2017-01-17 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5542:


 Summary: YARN client incorrectly uses local YARN config to check 
vcore capacity
 Key: FLINK-5542
 URL: https://issues.apache.org/jira/browse/FLINK-5542
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.1.4
Reporter: Shannon Carey


See 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/1-1-4-on-YARN-vcores-change-td11016.html

When using bin/yarn-session.sh, AbstractYarnClusterDescriptor line 271 in 1.1.4 
is comparing the user's selected number of vcores to the vcores configured in 
the local node's YARN config (from YarnConfiguration eg. yarn-site.xml and 
yarn-default.xml). It incorrectly prevents Flink from launching even if there 
is sufficient vcore capacity on the cluster.

That is not correct, because the application will not necessarily run on the 
local node. For example, if running the yarn-session.sh client from the AWS EMR 
master node, the vcore count there may be different from the vcore count on the 
core nodes where Flink will actually run.

A reasonable way to fix this would probably be to reuse the logic from 
"yarn-session.sh -q" (FlinkYarnSessionCli line 550) which knows how to get 
vcore information from the real worker nodes.  Alternatively, perhaps we could 
remove the check entirely and rely on YARN's Scheduler to determine whether 
sufficient resources exist.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5425) JobManager replaced by IP in metrics

2017-01-08 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5425?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15810263#comment-15810263
 ] 

Shannon Carey commented on FLINK-5425:
--

I am using the statsd reporter and the data thereby flows to Graphite. You can 
see that the filter character method of the statsd reporter does not filter 
".": 
https://github.com/apache/flink/blob/master/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java#L199

So, yes, this could be fixed in the reporter, and I could make a PR with that 
change... but it would impact dots in every part of the name, as you mention. 
While that might make sense for people like me who are using the Graphite 
backend (though it would change how our job names appear in the metrics since 
those contain periods), I'm not sure it makes sense for people who use other 
backends. Given the uncertainty, perhaps it would be better to add a 
configuration parameter which allows the user to control what characters get 
filtered out? A regex perhaps?

The reporter isn't really broken, it's just that the metric naming is 
inconsistent. To me, the simplest solution is to eliminate the difference 
behavior between identifying jobmanager by IP in the metrics vs. identifying 
taskmanager by hostname.

This problem is definitely present in 1.1.3 (that's where I'm seeing it in my 
live systems), but you're right the link I put in the description was to the 
then-current master.

I'm happy to submit a PR once the implementation approach has been decided... 
although I may need a little guidance about how to go about adjusting the 
TaskManagerLocation logic so JobManagerRunner can share it, if that's what gets 
decided.

> JobManager  replaced by IP in metrics
> ---
>
> Key: FLINK-5425
> URL: https://issues.apache.org/jira/browse/FLINK-5425
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Shannon Carey
>Priority: Minor
>
> In metrics at the jobmanager level and below, the "" scope variable is 
> being replaced by the IP rather than the hostname. The taskmanager metrics, 
> meanwhile, use the host name.
> You can see the job manager behavior at 
> https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147
>  compared to TaskManagerLocation#getHostname().
> The problem with this is mainly that due to the presence of "." (period) 
> characters in the IP address and thereby the metric name, the metric names 
> show up strangely in Graphite/Grafana, where "." is the metric group 
> separator.
> If it's not possible to make jobmanager metrics use the hostname, I suggest 
> replacing "." with "-" in the  section.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5425) JobManager replaced by IP in metrics

2017-01-06 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5425?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-5425:
-
Description: 
In metrics at the jobmanager level and below, the "" scope variable is 
being replaced by the IP rather than the hostname. The taskmanager metrics, 
meanwhile, use the host name.

You can see the job manager behavior at 
https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147
 compared to TaskManagerLocation#getHostname().

The problem with this is mainly that due to the presence of "." (period) 
characters in the IP address and thereby the metric name, the metric names show 
up strangely in Graphite/Grafana, where "." is the metric group separator.

If it's not possible to make jobmanager metrics use the hostname, I suggest 
replacing "." with "-" in the  section.

  was:
In metrics at the jobmanager level and below, the "" scope variable is 
being replaced by the IP rather than the hostname. The taskmanager metrics, 
meanwhile, use the host name.

You can see the job manager behavior at 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147
 compared to TaskManagerLocation#getHostname().

The problem with this is mainly that due to the presence of "." (period) 
characters in the IP address and thereby the metric name, the metric names show 
up strangely in Graphite/Grafana, where "." is the metric group separator.

If it's not possible to make jobmanager metrics use the hostname, I suggest 
replacing "." with "-" in the  section.


> JobManager  replaced by IP in metrics
> ---
>
> Key: FLINK-5425
> URL: https://issues.apache.org/jira/browse/FLINK-5425
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.1.3
>Reporter: Shannon Carey
>Priority: Minor
>
> In metrics at the jobmanager level and below, the "" scope variable is 
> being replaced by the IP rather than the hostname. The taskmanager metrics, 
> meanwhile, use the host name.
> You can see the job manager behavior at 
> https://github.com/apache/flink/blob/a1934255421b97eefd579183e9c7199c43ad1a2c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147
>  compared to TaskManagerLocation#getHostname().
> The problem with this is mainly that due to the presence of "." (period) 
> characters in the IP address and thereby the metric name, the metric names 
> show up strangely in Graphite/Grafana, where "." is the metric group 
> separator.
> If it's not possible to make jobmanager metrics use the hostname, I suggest 
> replacing "." with "-" in the  section.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5425) JobManager replaced by IP in metrics

2017-01-06 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5425:


 Summary: JobManager  replaced by IP in metrics
 Key: FLINK-5425
 URL: https://issues.apache.org/jira/browse/FLINK-5425
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.1.3
Reporter: Shannon Carey
Priority: Minor


In metrics at the jobmanager level and below, the "" scope variable is 
being replaced by the IP rather than the hostname. The taskmanager metrics, 
meanwhile, use the host name.

You can see the job manager behavior at 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java#L147
 compared to TaskManagerLocation#getHostname().

The problem with this is mainly that due to the presence of "." (period) 
characters in the IP address and thereby the metric name, the metric names show 
up strangely in Graphite/Grafana, where "." is the metric group separator.

If it's not possible to make jobmanager metrics use the hostname, I suggest 
replacing "." with "-" in the  section.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5322) Clean up yarn configuration documentation

2017-01-06 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15806496#comment-15806496
 ] 

Shannon Carey edited comment on FLINK-5322 at 1/7/17 2:11 AM:
--

Another important note: quoting strings doesn't work the way it should in YAML 
either.

Looks to be due to GlobalConfiguration#loadYAMLResource(). This really should 
be fixed to use a compliant YAML parser.


was (Author: rehevkor5):
Another important note: quoting strings doesn't work the way it should in YAML 
either.

> Clean up yarn configuration documentation
> -
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.2.0, 1.1.3
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0, 1.1.5
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.
> --
> The yarn section of the configuration page should be cleaned up a bit. The 
> "yarn.containers.vcores" parameter is listed twice, the example for 
> "yarn.application-master.env" is listed as a separate parameter and the 
> "yarn.taskmanager.env" description indirectly references another parameter 
> ("same as the above") which just isn't maintainable; instead it should be 
> described similarly as the application-master entry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5322) Clean up yarn configuration documentation

2017-01-06 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15806496#comment-15806496
 ] 

Shannon Carey commented on FLINK-5322:
--

Another important note: quoting strings doesn't work the way it should in YAML 
either.

> Clean up yarn configuration documentation
> -
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.2.0, 1.1.3
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.2.0, 1.1.5
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.
> --
> The yarn section of the configuration page should be cleaned up a bit. The 
> "yarn.containers.vcores" parameter is listed twice, the example for 
> "yarn.application-master.env" is listed as a separate parameter and the 
> "yarn.taskmanager.env" description indirectly references another parameter 
> ("same as the above") which just isn't maintainable; instead it should be 
> described similarly as the application-master entry.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5424) Improve Restart Strategy Logging

2017-01-06 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5424:


 Summary: Improve Restart Strategy Logging
 Key: FLINK-5424
 URL: https://issues.apache.org/jira/browse/FLINK-5424
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Shannon Carey
Assignee: Shannon Carey
Priority: Minor


I'll be submitting a PR which includes some minor improvements to logging 
related to restart strategies.

Specifically, I added a toString so that the log contains better info about 
failure-rate restart strategy, and I added an explanation in the log when the 
restart strategy is responsible for preventing job restart (currently, there's 
no indication that the restart strategy had anything to do with it).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-13 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15746008#comment-15746008
 ] 

Shannon Carey commented on FLINK-5322:
--

Yes, that worked! Thanks! It becomes available from within UDF (streaming 
operator) code. Perhaps this ticket can be adapted to just clarify the 
documentation to explain that the YAML isn't really interpreted the way people 
may expect/to provide an example?

> yarn.taskmanager.env value does not appear in System.getenv
> ---
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4091) flink-connector-cassandra has conflicting guava version

2016-12-12 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445
 ] 

Shannon Carey edited comment on FLINK-4091 at 12/12/16 9:44 PM:


Flink's inclusion of altered Cassandra classes (though probably unavoidable) 
was causing a lot of problems for us when using a library with dependencies on 
cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job 
locally from the IDE, the two different versions of Cassandra included on the 
classpath would cause runtime errors. Excluding the two dependencies with Maven 
seems to have fixed the issue, allowing us to run our Flink jobs from the IDE 
again. Just figured I'd mention it here in case it helps anyone else. However, 
we do have to be careful in the library not to use methods that return Guava 
classes (such as asyncPrepare) because they'll be written against the 
non-shaded Guava. Relocating the Guava package with the shade plugin works when 
using the jar, but not when running Flink jobs from within the IDE (IntelliJ 
doesn't have full integration with the shade plugin).


was (Author: rehevkor5):
Flink's inclusion of altered Cassandra classes (though probably unavoidable) 
was causing a lot of problems for us when using a library with dependencies on 
cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job 
locally from the IDE, the two different versions of Cassandra classed on the 
classpath would cause runtime errors. Excluding the two dependencies with Maven 
seems to have fixed the issue, allowing us to run our Flink jobs from the IDE 
again. Just figured I'd mention it here in case it helps anyone else.

> flink-connector-cassandra has conflicting guava version
> ---
>
> Key: FLINK-4091
> URL: https://issues.apache.org/jira/browse/FLINK-4091
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
> Environment: MacOSX, 1.10-SNAPSHOT (head is 
> 1a6bab3ef76805685044cf4521e32315169f9033)
>Reporter: Dominik Bruhn
>Assignee: Chesnay Schepler
>
> The newly merged cassandra streaming connector has an issue with its guava 
> dependency.
> The build-process for flink-connector-cassandra creates shaded JAR file which 
> contains the connector, the datastax cassandra driver plus in 
> org.apache.flink.shaded a shaded copy of guava. 
> The datastax cassandra driver calls into Futures.withFallback ([1]) which is 
> present in this guava version. This also works inside the 
> flink-connector-cassandra jar.
> Now the actual build-process for Flink happens and builds another shaded JAR 
> and creates the flink-dist.jar. Inside this JAR, there is also a shaded 
> version of guava inside org.apache.flink.shaded.
> Now the issue: The guava version which is in the flink-dist.jar is not 
> compatible and doesn't contain the Futures.withFallback which the datastax 
> driver is using.
> This leads into the following issue: You can without any problems launch a 
> flink task which uses the casandra driver locally (so through the 
> mini-cluster) because that is never using the flink-dist.jar. 
> BUT: As soon as you are trying to start this job on a flink cluster (which 
> uses the flink-dist.jar), the job breaks with the following exception:
> https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d
> You can inspect this by opening the 
> flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the 
> flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler.
> I don't know a good solution here: Perhaps it would be one solution to shade 
> the guava for the cassandra-driver somewhere else than at 
> org.apache.flink.shaded.
> [1]: 
> https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture,
>  com.google.common.util.concurrent.FutureFallback, 
> java.util.concurrent.Executor)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-12 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-5322:
-
Description: 
The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.

Interestingly, it does appear within the Flink JobManager Web UI under Job 
Manager -> Configuration.

  was:
The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.



> yarn.taskmanager.env value does not appear in System.getenv
> ---
>
> Key: FLINK-5322
> URL: https://issues.apache.org/jira/browse/FLINK-5322
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
> Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
>Reporter: Shannon Carey
>Priority: Trivial
> Fix For: 1.1.3
>
>
> The value I specified in flink-conf.yaml
> {code}
> yarn.taskmanager.env:
>   MY_ENV: test
> {code}
> is not available in {{System.getenv("MY_ENV")}} from the plan execution 
> (execution flow of main method) nor from within execution of a streaming 
> operator.
> Interestingly, it does appear within the Flink JobManager Web UI under Job 
> Manager -> Configuration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5322) yarn.taskmanager.env value does not appear in System.getenv

2016-12-12 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-5322:


 Summary: yarn.taskmanager.env value does not appear in 
System.getenv
 Key: FLINK-5322
 URL: https://issues.apache.org/jira/browse/FLINK-5322
 Project: Flink
  Issue Type: Bug
  Components: YARN
 Environment: Flink 1.1.3 on AWS EMR emr-5.2.0 (Hadoop "Amazon 2.7.3")
Reporter: Shannon Carey
Priority: Trivial
 Fix For: 1.1.3


The value I specified in flink-conf.yaml

{code}
yarn.taskmanager.env:
  MY_ENV: test
{code}

is not available in {{System.getenv("MY_ENV")}} from the plan execution 
(execution flow of main method) nor from within execution of a streaming 
operator.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4091) flink-connector-cassandra has conflicting guava version

2016-11-03 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15634445#comment-15634445
 ] 

Shannon Carey commented on FLINK-4091:
--

Flink's inclusion of altered Cassandra classes (though probably unavoidable) 
was causing a lot of problems for us when using a library with dependencies on 
cassandra-driver-core and cassandra-driver-mapping. When launching a Flink job 
locally from the IDE, the two different versions of Cassandra classed on the 
classpath would cause runtime errors. Excluding the two dependencies with Maven 
seems to have fixed the issue, allowing us to run our Flink jobs from the IDE 
again. Just figured I'd mention it here in case it helps anyone else.

> flink-connector-cassandra has conflicting guava version
> ---
>
> Key: FLINK-4091
> URL: https://issues.apache.org/jira/browse/FLINK-4091
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0
> Environment: MacOSX, 1.10-SNAPSHOT (head is 
> 1a6bab3ef76805685044cf4521e32315169f9033)
>Reporter: Dominik Bruhn
>Assignee: Chesnay Schepler
>
> The newly merged cassandra streaming connector has an issue with its guava 
> dependency.
> The build-process for flink-connector-cassandra creates shaded JAR file which 
> contains the connector, the datastax cassandra driver plus in 
> org.apache.flink.shaded a shaded copy of guava. 
> The datastax cassandra driver calls into Futures.withFallback ([1]) which is 
> present in this guava version. This also works inside the 
> flink-connector-cassandra jar.
> Now the actual build-process for Flink happens and builds another shaded JAR 
> and creates the flink-dist.jar. Inside this JAR, there is also a shaded 
> version of guava inside org.apache.flink.shaded.
> Now the issue: The guava version which is in the flink-dist.jar is not 
> compatible and doesn't contain the Futures.withFallback which the datastax 
> driver is using.
> This leads into the following issue: You can without any problems launch a 
> flink task which uses the casandra driver locally (so through the 
> mini-cluster) because that is never using the flink-dist.jar. 
> BUT: As soon as you are trying to start this job on a flink cluster (which 
> uses the flink-dist.jar), the job breaks with the following exception:
> https://gist.github.com/theomega/5ab9b14ffb516b15814de28e499b040d
> You can inspect this by opening the 
> flink-connector-cassandra_2.11-1.1-SNAPSHOT.jar and the 
> flink-dist_2.11-1.1-SNAPSHOT.jar in a java decompiler.
> I don't know a good solution here: Perhaps it would be one solution to shade 
> the guava for the cassandra-driver somewhere else than at 
> org.apache.flink.shaded.
> [1]: 
> https://google.github.io/guava/releases/19.0/api/docs/com/google/common/util/concurrent/Futures.html#withFallback(com.google.common.util.concurrent.ListenableFuture,
>  com.google.common.util.concurrent.FutureFallback, 
> java.util.concurrent.Executor)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-12 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569025#comment-15569025
 ] 

Shannon Carey commented on FLINK-4803:
--

Yes, that's right. cancel() blocks on close(), and therefore if close() 
misbehaves the thread is never interrupted and cancel() blocks forever.

In the issue description, I suggested your option #2. I think you'll want #1 no 
matter what. However, #2 allows for at least one message and/or exception to be 
logged that tells the user what went wrong (why their job is taking a long time 
to cancel, or why it did not cancel gracefully). I'm not sure what your 
DataSink-specific option would look like. Maybe it is similar to my workaround, 
where I wrapped my HadoopOutputFormat in a subclass that calls super.close() 
from a separate thread with a timeout? That workaround is ok, but I had to 
expend a fair amount of effort to figure out what the problem was, and also 
there was nothing I could do but restart Flink in order to get my job to 
terminate (not a desirable solution). You'll want Flink to function smoothly 
regardless of what data sink the user chooses.

> Job Cancel can hang forever waiting for OutputFormat.close()
> 
>
> Key: FLINK-4803
> URL: https://issues.apache.org/jira/browse/FLINK-4803
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.1.1
>Reporter: Shannon Carey
>
> If the Flink job uses a badly-behaved OutputFormat (in this example, a 
> HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() 
> method blocks forever, it is impossible to cancel the Flink job even though 
> the blocked thread would respond to an interrupt. The stack traces below show 
> the state of the important threads when a job is canceled and the 
> OutputFormat is blocking forever inside of close().
> I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
> `this.format.close()`. When the timeout is reached, the Task thread should be 
> interrupted.
> {code}
> "Canceler for DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
> monitor entry [0x7fb7be079000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - waiting to lock <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
> at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
> at java.lang.Thread.run(Thread.java:745)
> "DataSink 
> (org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
> #6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
> condition [0x7fb7bdf78000]
>java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  <0x0006c5ab5e20> (a 
> java.util.concurrent.SynchronousQueue$TransferStack)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
> at 
> java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
> at 
> java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
> at 
> org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
> at 
> org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
> at 
> org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
> at 
> org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
> - locked <0x0006bae5f788> (a java.lang.Object)
> at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-11 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-4803:
-
Description: 
If the Flink job uses a badly-behaved OutputFormat (in this example, a 
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method 
blocks forever, it is impossible to cancel the Flink job even though the 
blocked thread would respond to an interrupt. The stack traces below show the 
state of the important threads when a job is canceled and the OutputFormat is 
blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
`this.format.close()`. When the timeout is reached, the Task thread should be 
interrupted.

{code}
"Canceler for DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6422 daemon prio=5 os_prio=0 tid=0x7fb7e42f nid=0x34f3 waiting for 
monitor entry [0x7fb7be079000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- waiting to lock <0x0006bae5f788> (a java.lang.Object)
at 
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
at java.lang.Thread.run(Thread.java:745)

"DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
condition [0x7fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006c5ab5e20> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at 
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at 
java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
at 
org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- locked <0x0006bae5f788> (a java.lang.Object)
at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
{code}

  was:
If the Flink job uses a badly-behaved OutputFormat (in this example, a 
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method 
blocks forever, it is impossible to cancel the Flink job even though the 
blocked thread would respond to an interrupt. The stack traces below show the 
state of the important threads when a job is canceled and the OutputFormat is 
blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
`this.format.close()`. When the timeout is reached, the Task thread should be 
interrupted.

{code}
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- waiting to lock <0x0006bae5f788> (a java.lang.Object)
at 
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
at java.lang.Thread.run(Thread.java:745)

"DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
condition [0x7fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006c5ab5e20> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at 

[jira] [Created] (FLINK-4803) Job Cancel can hang forever waiting for OutputFormat.close()

2016-10-11 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-4803:


 Summary: Job Cancel can hang forever waiting for 
OutputFormat.close()
 Key: FLINK-4803
 URL: https://issues.apache.org/jira/browse/FLINK-4803
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.1.1
Reporter: Shannon Carey


If the Flink job uses a badly-behaved OutputFormat (in this example, a 
HadoopOutputFormat containing a CqlBulkOutputFormat), where the close() method 
blocks forever, it is impossible to cancel the Flink job even though the 
blocked thread would respond to an interrupt. The stack traces below show the 
state of the important threads when a job is canceled and the OutputFormat is 
blocking forever inside of close().

I suggest that `DataSinkTask.cancel()` method be updated to add a timeout on 
`this.format.close()`. When the timeout is reached, the Task thread should be 
interrupted.

{code}
   java.lang.Thread.State: BLOCKED (on object monitor)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- waiting to lock <0x0006bae5f788> (a java.lang.Object)
at 
org.apache.flink.runtime.operators.DataSinkTask.cancel(DataSinkTask.java:268)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1149)
at java.lang.Thread.run(Thread.java:745)

"DataSink 
(org.apache.flink.api.scala.hadoop.mapred.HadoopOutputFormat@13bf17a2) (2/5)" 
#6410 daemon prio=5 os_prio=0 tid=0x7fb7e79a4800 nid=0x2ad8 waiting on 
condition [0x7fb7bdf78000]
   java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0006c5ab5e20> (a 
java.util.concurrent.SynchronousQueue$TransferStack)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
at 
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
at 
java.util.concurrent.SynchronousQueue.offer(SynchronousQueue.java:895)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.put(SSTableSimpleUnsortedWriter.java:194)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.sync(SSTableSimpleUnsortedWriter.java:180)
at 
org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter.close(SSTableSimpleUnsortedWriter.java:156)
at 
org.apache.cassandra.io.sstable.CQLSSTableWriter.close(CQLSSTableWriter.java:275)
at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:133)
at 
org.apache.cassandra.hadoop.AbstractBulkRecordWriter.close(AbstractBulkRecordWriter.java:126)
at 
org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase.close(HadoopOutputFormatBase.java:158)
- locked <0x0006bae5f788> (a java.lang.Object)
at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:234)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3190) Retry rate limits for DataStream API

2016-08-31 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-3190:
-
Fix Version/s: 1.1.0

> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
> Fix For: 1.1.0
>
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-23 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15433839#comment-15433839
 ] 

Shannon Carey commented on FLINK-4418:
--

Thanks [~rmetzger]. Should I mark the issue as "resolved" since there is a PR 
available? I'm not sure what your JIRA workflow looks like.

> ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if 
> InetAddress.getLocalHost throws exception
> --
>
> Key: FLINK-4418
> URL: https://issues.apache.org/jira/browse/FLINK-4418
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>
> When attempting to connect to a cluster with a ClusterClient, if the 
> machine's hostname is not resolvable to an IP, an exception is thrown 
> preventing success.
> This is the case if, for example, the hostname is not present & mapped to a 
> local IP in /etc/hosts.
> The exception is below. I suggest that findAddressUsingStrategy() should 
> catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and 
> return null, allowing alternative strategies to be attempted by 
> findConnectingAddress(). I will open a PR to this effect. Ideally this could 
> be included in both 1.2 and 1.1.2.
> In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS 
> EC2 instance.
> {code}
> 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed 
> to retrieve the JobManager gateway.
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
> 21:11:35  at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> 21:11:35  at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
> 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
> address at /10.2.89.80:43126
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
> 21:11:35  ... 8 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
> ip-10-2-64-47: unknown error
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
> 21:11:35  ... 10 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown 
> error
> 21:11:35  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> 21:11:35  at 
> java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
> 21:11:35  at 
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
> 21:11:35  ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-17 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey updated FLINK-4418:
-
Description: 
When attempting to connect to a cluster with a ClusterClient, if the machine's 
hostname is not resolvable to an IP, an exception is thrown preventing success.

This is the case if, for example, the hostname is not present & mapped to a 
local IP in /etc/hosts.

The exception is below. I suggest that findAddressUsingStrategy() should catch 
java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return 
null, allowing alternative strategies to be attempted by 
findConnectingAddress(). I will open a PR to this effect. Ideally this could be 
included in both 1.2 and 1.1.2.


In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS 
EC2 instance.
{code}
21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to 
retrieve the JobManager gateway.
21:11:35at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
21:11:35at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
21:11:35at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
21:11:35at 
com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
21:11:35at 
com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
21:11:35at 
com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
21:11:35at 
com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
address at /10.2.89.80:43126
21:11:35at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
21:11:35at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
21:11:35at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
21:11:35... 8 more
21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
ip-10-2-64-47: unknown error
21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
21:11:35at 
org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
21:11:35at 
org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
21:11:35at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
21:11:35... 10 more
21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown error
21:11:35at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
21:11:35at 
java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
21:11:35at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
21:11:35... 13 more
{code}

  was:
When attempting to connect to a cluster with a ClusterClient, if the machine's 
hostname is not resolvable to an IP, an exception is thrown preventing success.

This is the case if, for example, the hostname is not present & mapped to a 
local IP in /etc/hosts.

The exception is below. I suggest that findAddressUsingStrategy() should catch 
java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return 
null, allowing alternative strategies to be attempted by 
findConnectingAddress(). I will open a PR to this effect. Ideally this could be 
included in both 1.2 and 1.1.2.

{code}
21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to 
retrieve the JobManager gateway.
21:11:35at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
21:11:35at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
21:11:35at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
21:11:35at 
com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
21:11:35at 
com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
21:11:35at 

[jira] [Created] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-17 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-4418:


 Summary: ClusterClient/ConnectionUtils#findConnectingAddress fails 
immediately if InetAddress.getLocalHost throws exception
 Key: FLINK-4418
 URL: https://issues.apache.org/jira/browse/FLINK-4418
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.1.0
Reporter: Shannon Carey


When attempting to connect to a cluster with a ClusterClient, if the machine's 
hostname is not resolvable to an IP, an exception is thrown preventing success.

This is the case if, for example, the hostname is not present & mapped to a 
local IP in /etc/hosts.

The exception is below. I suggest that findAddressUsingStrategy() should catch 
java.net.UnknownHostException thrown by InetAddress.getLocalHost() and return 
null, allowing alternative strategies to be attempted by 
findConnectingAddress(). I will open a PR to this effect. Ideally this could be 
included in both 1.2 and 1.1.2.

{code}
21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed to 
retrieve the JobManager gateway.
21:11:35at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
21:11:35at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
21:11:35at 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
21:11:35at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
21:11:35at 
com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
21:11:35at 
com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
21:11:35at 
com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
21:11:35at 
com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
address at /10.2.89.80:43126
21:11:35at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
21:11:35at 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
21:11:35at 
org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
21:11:35... 8 more
21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
ip-10-2-64-47: unknown error
21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
21:11:35at 
org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
21:11:35at 
org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
21:11:35at 
org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
21:11:35... 10 more
21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown error
21:11:35at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
21:11:35at 
java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
21:11:35at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
21:11:35at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
21:11:35... 13 more
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4334) Shaded Hadoop1 jar not fully excluded in Quickstart

2016-08-08 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-4334:


 Summary: Shaded Hadoop1 jar not fully excluded in Quickstart
 Key: FLINK-4334
 URL: https://issues.apache.org/jira/browse/FLINK-4334
 Project: Flink
  Issue Type: Bug
  Components: Quickstarts
Affects Versions: 1.0.3, 1.0.2, 1.0.1, 1.1.0
Reporter: Shannon Carey


The Shaded Hadoop1 jar has artifactId flink-shaded-hadoop1_2.10 since Flink 
1.0.0 (see 
https://github.com/apache/flink/commit/2c4e4d1ffaf4107fb802c90858184fc10af66837),
 but the quickstart POMs both refer to it as flink-shaded-hadoop1.

If using "-Pbuild-jar", the problem is not encountered.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-14 Thread Shannon Carey (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shannon Carey closed FLINK-4069.

Resolution: Duplicate

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-14 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15330837#comment-15330837
 ] 

Shannon Carey commented on FLINK-4069:
--

Oh, I'm sorry! I looked for an existing issue but didn't notice that one. I 
will close this one as duplicate.

> Kafka Consumer should not initialize on construction
> 
>
> Key: FLINK-4069
> URL: https://issues.apache.org/jira/browse/FLINK-4069
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Shannon Carey
>
> The Kafka Consumer connector currently interacts over the network with Kafka 
> in order to get partition metadata when the class is constructed. Instead, it 
> should do that work when the job actually begins to run (for example, in 
> AbstractRichFunction#open() of FlinkKafkaConsumer0?).
> The main weakness of broker querying in the constructor is that if there are 
> network problems, Flink might take a long time (eg. ~1hr) inside the 
> user-supplied main() method while it attempts to contact each broker and 
> perform retries. In general, setting up the Kafka partitions does not seem 
> strictly necessary as part of execution of main() in order to set up the job 
> plan/topology.
> However, as Robert Metzger mentions, there are important concerns with how 
> Kafka partitions are handled:
> "The main reason why we do the querying centrally is:
> a) avoid overloading the brokers
> b) send the same list of partitions (in the same order) to all parallel 
> consumers to do a fixed partition assignments (also across restarts). When we 
> do the querying in the open() method, we need to make sure that all 
> partitions are assigned, without duplicates (also after restarts in case of 
> failures)."
> See also the mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4069) Kafka Consumer should not initialize on construction

2016-06-13 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-4069:


 Summary: Kafka Consumer should not initialize on construction
 Key: FLINK-4069
 URL: https://issues.apache.org/jira/browse/FLINK-4069
 Project: Flink
  Issue Type: Improvement
  Components: Kafka Connector
Affects Versions: 1.0.3
Reporter: Shannon Carey


The Kafka Consumer connector currently interacts over the network with Kafka in 
order to get partition metadata when the class is constructed. Instead, it 
should do that work when the job actually begins to run (for example, in 
AbstractRichFunction#open() of FlinkKafkaConsumer0?).

The main weakness of broker querying in the constructor is that if there are 
network problems, Flink might take a long time (eg. ~1hr) inside the 
user-supplied main() method while it attempts to contact each broker and 
perform retries. In general, setting up the Kafka partitions does not seem 
strictly necessary as part of execution of main() in order to set up the job 
plan/topology.

However, as Robert Metzger mentions, there are important concerns with how 
Kafka partitions are handled:

"The main reason why we do the querying centrally is:
a) avoid overloading the brokers
b) send the same list of partitions (in the same order) to all parallel 
consumers to do a fixed partition assignments (also across restarts). When we 
do the querying in the open() method, we need to make sure that all partitions 
are assigned, without duplicates (also after restarts in case of failures)."

See also the mailing list discussion: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/API-request-to-submit-job-takes-over-1hr-td7319.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3711) Scala fold() example syntax incorrect

2016-04-06 Thread Shannon Carey (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15229638#comment-15229638
 ] 

Shannon Carey commented on FLINK-3711:
--

I will have a Github PR for you on this momentarily.

> Scala fold() example syntax incorrect
> -
>
> Key: FLINK-3711
> URL: https://issues.apache.org/jira/browse/FLINK-3711
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.0.0, 1.0.1
>Reporter: Shannon Carey
>Priority: Minor
>
> Scala's KeyedStream#fold which accepts scala.Function2 is defined as a 
> partially appliable function. The documentation, however, is written as if it 
> is a non-partial function.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3711) Scala fold() example syntax incorrect

2016-04-06 Thread Shannon Carey (JIRA)
Shannon Carey created FLINK-3711:


 Summary: Scala fold() example syntax incorrect
 Key: FLINK-3711
 URL: https://issues.apache.org/jira/browse/FLINK-3711
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.0.1, 1.0.0
Reporter: Shannon Carey
Priority: Minor


Scala's KeyedStream#fold which accepts scala.Function2 is defined as a 
partially appliable function. The documentation, however, is written as if it 
is a non-partial function.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)