[jira] [Resolved] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5788.
-
Resolution: Fixed

Fixed via f7af3b01681592787db16a555b55d6b11d35f869

> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865886#comment-15865886
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051346
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Somehow, without these settings, the new tests that test `setStartFrom` 
methods will fail by complaining the property config does not specify settings 
for the deserializers.

I guess it is because in those tests, we have Kafka clients that are used 
*only* for offset committing and fetching, in which case the client cannot 
infer the types to use for the serializers?


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing 

[jira] [Commented] (FLINK-5796) broken links in the docs

2017-02-14 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865881#comment-15865881
 ] 

Ufuk Celebi commented on FLINK-5796:


[~rmetzger] Can we integrate this check with the build bot?

> broken links in the docs
> 
>
> Key: FLINK-5796
> URL: https://issues.apache.org/jira/browse/FLINK-5796
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> running a link checker on the locally-served flink docs yields the following 
> broken links (same for online docs):
> {code}
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state (from 
> 0.0.0.0:4000/dev/datastream_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/monitoring/best_practices.html (from 0.0.0.0:4000/dev/batch/)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/api/java/org/apache/flink/table/api/Table.html (from 
> 0.0.0.0:4000/dev/table_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state.html 
> (from 0.0.0.0:4000/ops/state_backends.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/internals/state_backends.html (from 
> 0.0.0.0:4000/internals/stream_checkpointing.html)
> {code}
> FYI: command to replay:
> {{httrack http://0.0.0.0:4000/  -O "$PWD/flink-docs" --testlinks  -%v 
> --depth= --ext-depth=0}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5788.
---

> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051346
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Somehow, without these settings, the new tests that test `setStartFrom` 
methods will fail by complaining the property config does not specify settings 
for the deserializers.

I guess it is because in those tests, we have Kafka clients that are used 
*only* for offset committing and fetching, in which case the client cannot 
infer the types to use for the serializers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051496
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Perhaps I should move this out of the `standardProps` and set them in those 
tests only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-5553) Job fails during deployment with IllegalStateException from subpartition request

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5553.
-
   Resolution: Fixed
Fix Version/s: 1.3.0

Fixed via af81bebd0fabc6390930689df131e72edab6995b

> Job fails during deployment with IllegalStateException from subpartition 
> request
> 
>
> Key: FLINK-5553
> URL: https://issues.apache.org/jira/browse/FLINK-5553
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
> Fix For: 1.3.0
>
> Attachments: application-1484132267957-0076
>
>
> While running a test job with Flink 1.3-SNAPSHOT 
> (6fb6967b9f9a31f034bd09fcf76aaf147bc8e9a0) the job failed with this exception:
> {code}
> 2017-01-18 14:56:27,043 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
> (9/10) (befc06d0e792c2ce39dde74b365dd3cf) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,059 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (9/10) (e94a01ec283e5dce7f79b02cf51654c4) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,817 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (10/10) (cbb61c9a2f72c282877eb383e111f7cd) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> 2017-01-18 14:56:27,819 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
> Misbehaved Job (b1d985d11984df57400fdff2bb656c59) switched from state RUNNING 
> to FAILING.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> This is the first exception that is reported to the jobmanager.
> I think this is related to missing network buffers. You see that from the 
> next deployment after the restart, where the deployment fails with the 
> insufficient number of buffers exception.
> I'll add logs to the JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051598
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 50 records written to each of 3 partitions before launching 
a latest-starting consuming job
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   // each partition will be written an extra 200 records
+   final int extraRecordsInEachPartition = 200;
+
+   // all already existing data in the topic, before the consuming 
topology has started, should be ignored
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   // job names for the topologies for writing and consuming the 
extra records
+   final String consumeExtraRecordsJobName = "Consume Extra 
Records Job";
+   final String writeExtraRecordsJobName = "Write Extra Records 
Job";
+
+   // seriliazation / deserialization schemas for writing and 
consuming the extra records
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
+
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   // setup and run the latest-consuming job
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   final Properties readProps = new Properties();
+   

[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865888#comment-15865888
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051496
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -229,6 +231,8 @@ public void prepare(int numKafkaServers, Properties 
additionalServerProperties,
standardProps.setProperty("zookeeper.connect", 
zookeeperConnectionString);
standardProps.setProperty("bootstrap.servers", 
brokerConnectionString);
standardProps.setProperty("group.id", "flink-tests");
+   standardProps.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+   standardProps.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
--- End diff --

Perhaps I should move this out of the `standardProps` and set them in those 
tests only.


> New Flink-specific option to set starting position of Kafka consumer without 
> respecting external offsets in ZK / Broker
> ---
>
> Key: FLINK-4280
> URL: https://issues.apache.org/jira/browse/FLINK-4280
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
> Fix For: 1.3.0
>
>
> Currently, to start reading from the "earliest" and "latest" position in 
> topics for the Flink Kafka consumer, users set the Kafka config 
> {{auto.offset.reset}} in the provided properties configuration.
> However, the way this config actually works might be a bit misleading if 
> users were trying to find a way to "read topics from a starting position". 
> The way the {{auto.offset.reset}} config works in the Flink Kafka consumer 
> resembles Kafka's original intent for the setting: first, existing external 
> offsets committed to the ZK / brokers will be checked; if none exists, then 
> will {{auto.offset.reset}} be respected.
> I propose to add Flink-specific ways to define the starting position, without 
> taking into account the external offsets. The original behaviour (reference 
> external offsets first) can be changed to be a user option, so that the 
> behaviour can be retained for frequent Kafka users that may need some 
> collaboration with existing non-Flink Kafka consumer applications.
> How users will interact with the Flink Kafka consumer after this is added, 
> with a newly introduced {{flink.starting-position}} config:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "earliest/latest");
> props.setProperty("auto.offset.reset", "..."); // this will be ignored (log a 
> warning)
> props.setProperty("group.id", "...") // this won't have effect on the 
> starting position anymore (may still be used in external offset committing)
> ...
> {code}
> Or, reference external offsets in ZK / broker:
> {code}
> Properties props = new Properties();
> props.setProperty("flink.starting-position", "external-offsets");
> props.setProperty("auto.offset.reset", "earliest/latest"); // default will be 
> latest
> props.setProperty("group.id", "..."); // will be used to lookup external 
> offsets in ZK / broker on startup
> ...
> {code}
> A thing we would need to decide on is what would the default value be for 
> {{flink.starting-position}}.
> Two merits I see in adding this:
> 1. This compensates the way users generally interpret "read from a starting 
> position". As the Flink Kafka connector is somewhat essentially a 
> "high-level" Kafka consumer for Flink users, I think it is reasonable to add 
> Flink-specific functionality that users will find useful, although it wasn't 
> supported in Kafka's original consumer designs.
> 2. By adding this, the idea that "the Kafka offset store (ZK / brokers) is 
> used only to expose progress to the outside world, and not used to manipulate 
> how Kafka topics are read in Flink (unless users opt to do so)" is even more 
> definite and solid. There was some discussion in this PR 
> (https://github.com/apache/flink/pull/1690, FLINK-3398) on this aspect. I 
> think adding this "decouples" more Flink's internal offset checkpointing from 
> the external Kafka's offset store.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-4280) New Flink-specific option to set starting position of Kafka consumer without respecting external offsets in ZK / Broker

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865891#comment-15865891
 ] 

ASF GitHub Bot commented on FLINK-4280:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r101051598
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
@@ -438,6 +439,215 @@ public void run() {
kafkaOffsetHandler.close();
deleteTestTopic(topicName);
}
+
+   /**
+* This test ensures that when explicitly set to start from earliest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromEarliestOffsets() throws Exception {
+   // 3 partitions with 50 records each (0-49, so the expected 
commit offset of each partition should be 50)
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   final String topicName = 
writeSequence("testStartFromEarliestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort);
+   env.getConfig().disableSysoutLogging();
+   env.setParallelism(parallelism);
+
+   Properties readProps = new Properties();
+   readProps.putAll(standardProps);
+   readProps.setProperty("auto.offset.reset", "latest"); // this 
should be ignored
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   readSequence(env, StartupMode.EARLIEST, readProps, parallelism, 
topicName, recordsInEachPartition, 0);
+
+   kafkaOffsetHandler.close();
+   deleteTestTopic(topicName);
+   }
+
+   /**
+* This test ensures that when explicitly set to start from latest 
record, the consumer
+* ignores the "auto.offset.reset" behaviour as well as any committed 
group offsets in Kafka.
+*/
+   public void runStartFromLatestOffsets() throws Exception {
+   // 50 records written to each of 3 partitions before launching 
a latest-starting consuming job
+   final int parallelism = 3;
+   final int recordsInEachPartition = 50;
+
+   // each partition will be written an extra 200 records
+   final int extraRecordsInEachPartition = 200;
+
+   // all already existing data in the topic, before the consuming 
topology has started, should be ignored
+   final String topicName = 
writeSequence("testStartFromLatestOffsetsTopic", recordsInEachPartition, 
parallelism, 1);
+
+   // the committed offsets should be ignored
+   KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = 
kafkaServer.createOffsetHandler(standardProps);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 0, 23);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 1, 31);
+   kafkaOffsetHandler.setCommittedOffset(topicName, 2, 43);
+
+   // job names for the topologies for writing and consuming the 
extra records
+   final String consumeExtraRecordsJobName = "Consume Extra 
Records Job";
+   final String writeExtraRecordsJobName = "Write Extra Records 
Job";
+
+   // seriliazation / deserialization schemas for writing and 
consuming the extra records
+   final TypeInformation> resultType =
+   TypeInformation.of(new TypeHint>() {});
+
+   final KeyedSerializationSchema> 
serSchema =
+   new KeyedSerializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   final KeyedDeserializationSchema> 
deserSchema =
+   new KeyedDeserializationSchemaWrapper<>(
+   new 
TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
+
+   // setup and run the latest-consuming job
+   final StreamExecutionEnvironment env = 

[jira] [Closed] (FLINK-5553) Job fails during deployment with IllegalStateException from subpartition request

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5553.
---

> Job fails during deployment with IllegalStateException from subpartition 
> request
> 
>
> Key: FLINK-5553
> URL: https://issues.apache.org/jira/browse/FLINK-5553
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
> Fix For: 1.3.0
>
> Attachments: application-1484132267957-0076
>
>
> While running a test job with Flink 1.3-SNAPSHOT 
> (6fb6967b9f9a31f034bd09fcf76aaf147bc8e9a0) the job failed with this exception:
> {code}
> 2017-01-18 14:56:27,043 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
> (9/10) (befc06d0e792c2ce39dde74b365dd3cf) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,059 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (9/10) (e94a01ec283e5dce7f79b02cf51654c4) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,817 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (10/10) (cbb61c9a2f72c282877eb383e111f7cd) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> 2017-01-18 14:56:27,819 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
> Misbehaved Job (b1d985d11984df57400fdff2bb656c59) switched from state RUNNING 
> to FAILING.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> This is the first exception that is reported to the jobmanager.
> I think this is related to missing network buffers. You see that from the 
> next deployment after the restart, where the deployment fails with the 
> insufficient number of buffers exception.
> I'll add logs to the JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-5762) Protect initializeState() and open() by the same lock.

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5762.
---

> Protect initializeState() and open() by the same lock.
> --
>
> Key: FLINK-5762
> URL: https://issues.apache.org/jira/browse/FLINK-5762
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the initializeState() of all operators in a task is called without 
> the checkpoint lock, and before the open(). This may lead to problematic 
> situations as the following:
> In the case that we retrieve timers from a checkpoint, e.g. WindowOperator 
> and (future) CEP, if we re-register them in the initializeState(), then if 
> they fire before the open() of the downstream operators is called, we will 
> have a task failure, as the downstream channels are not open.
> To avoid this, we can put the initializeState() in the same lock as the 
> open(), and the two operations will happen while being protected by the same 
> lock, which also keeps timers from firing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (FLINK-5762) Protect initializeState() and open() by the same lock.

2017-02-14 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5762.
-
Resolution: Fixed

Fixed via a91b6ff05d8af870ad076f9bf0fc17886787bc46

> Protect initializeState() and open() by the same lock.
> --
>
> Key: FLINK-5762
> URL: https://issues.apache.org/jira/browse/FLINK-5762
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the initializeState() of all operators in a task is called without 
> the checkpoint lock, and before the open(). This may lead to problematic 
> situations as the following:
> In the case that we retrieve timers from a checkpoint, e.g. WindowOperator 
> and (future) CEP, if we re-register them in the initializeState(), then if 
> they fire before the open() of the downstream operators is called, we will 
> have a task failure, as the downstream channels are not open.
> To avoid this, we can put the initializeState() in the same lock as the 
> open(), and the two operations will happen while being protected by the same 
> lock, which also keeps timers from firing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5796) broken links in the docs

2017-02-14 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865894#comment-15865894
 ] 

Robert Metzger commented on FLINK-5796:
---

Sorry, I don't have time for this right now. 

I think we can integrate this check into the build docs script. This way, we 
don't need to work on buildbot. (ideally this is put into a separate script 
that is called from both the build docs script and buildbot) (I  can do the 
call in buildbot)

> broken links in the docs
> 
>
> Key: FLINK-5796
> URL: https://issues.apache.org/jira/browse/FLINK-5796
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> running a link checker on the locally-served flink docs yields the following 
> broken links (same for online docs):
> {code}
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state (from 
> 0.0.0.0:4000/dev/datastream_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/monitoring/best_practices.html (from 0.0.0.0:4000/dev/batch/)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/api/java/org/apache/flink/table/api/Table.html (from 
> 0.0.0.0:4000/dev/table_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state.html 
> (from 0.0.0.0:4000/ops/state_backends.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/internals/state_backends.html (from 
> 0.0.0.0:4000/internals/stream_checkpointing.html)
> {code}
> FYI: command to replay:
> {{httrack http://0.0.0.0:4000/  -O "$PWD/flink-docs" --testlinks  -%v 
> --depth= --ext-depth=0}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3308: [FLINK-5796] fix some broken links in the docs

2017-02-14 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3308

[FLINK-5796] fix some broken links in the docs

this probably also applies to the release-1.2 docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5796

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3308


commit 39410e8e1819f142480daf0ae0d9f8acbd7dcf9d
Author: Nico Kruber 
Date:   2017-02-14T14:56:11Z

[FLINK-5796] fix some broken links in the docs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3297: [FLINK-5431] Add configurable timePattern for clie...

2017-02-14 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3297#discussion_r101052582
  
--- Diff: docs/setup/config.md ---
@@ -151,6 +151,8 @@ one of the values specified in 
`security.kerberos.login.contexts`.
 
 ### Other
 
+- `client.logging.time-pattern`: (DEFAULT: `-MM-dd'T'HH:mm:ss`) The 
date time pattern applied to client akctor logger.
--- End diff --

`acktor` -> `actor`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5431) time format for akka status

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865897#comment-15865897
 ] 

ASF GitHub Bot commented on FLINK-5431:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3297#discussion_r101052582
  
--- Diff: docs/setup/config.md ---
@@ -151,6 +151,8 @@ one of the values specified in 
`security.kerberos.login.contexts`.
 
 ### Other
 
+- `client.logging.time-pattern`: (DEFAULT: `-MM-dd'T'HH:mm:ss`) The 
date time pattern applied to client akctor logger.
--- End diff --

`acktor` -> `actor`


> time format for akka status
> ---
>
> Key: FLINK-5431
> URL: https://issues.apache.org/jira/browse/FLINK-5431
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Alexey Diomin
>Assignee: Anton Solovev
>Priority: Minor
>
> In ExecutionGraphMessages we have code
> {code}
> private val DATE_FORMATTER: SimpleDateFormat = new 
> SimpleDateFormat("MM/dd/ HH:mm:ss")
> {code}
> But sometimes it cause confusion when main logger configured with 
> "dd/MM/".
> We need making this format configurable or maybe stay only "HH:mm:ss" for 
> prevent misunderstanding output date-time



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5796) broken links in the docs

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865900#comment-15865900
 ] 

ASF GitHub Bot commented on FLINK-5796:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/3308

[FLINK-5796] fix some broken links in the docs

this probably also applies to the release-1.2 docs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-5796

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3308.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3308


commit 39410e8e1819f142480daf0ae0d9f8acbd7dcf9d
Author: Nico Kruber 
Date:   2017-02-14T14:56:11Z

[FLINK-5796] fix some broken links in the docs




> broken links in the docs
> 
>
> Key: FLINK-5796
> URL: https://issues.apache.org/jira/browse/FLINK-5796
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> running a link checker on the locally-served flink docs yields the following 
> broken links (same for online docs):
> {code}
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state (from 
> 0.0.0.0:4000/dev/datastream_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/monitoring/best_practices.html (from 0.0.0.0:4000/dev/batch/)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/api/java/org/apache/flink/table/api/Table.html (from 
> 0.0.0.0:4000/dev/table_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state.html 
> (from 0.0.0.0:4000/ops/state_backends.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/internals/state_backends.html (from 
> 0.0.0.0:4000/internals/stream_checkpointing.html)
> {code}
> FYI: command to replay:
> {{httrack http://0.0.0.0:4000/  -O "$PWD/flink-docs" --testlinks  -%v 
> --depth= --ext-depth=0}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5796) broken links in the docs

2017-02-14 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865902#comment-15865902
 ] 

Nico Kruber commented on FLINK-5796:


{{/api/java/org/apache/flink/table/api/Table.html}} is actually available on 
the server (javadocs are served separately)

> broken links in the docs
> 
>
> Key: FLINK-5796
> URL: https://issues.apache.org/jira/browse/FLINK-5796
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>
> running a link checker on the locally-served flink docs yields the following 
> broken links (same for online docs):
> {code}
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state (from 
> 0.0.0.0:4000/dev/datastream_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/monitoring/best_practices.html (from 0.0.0.0:4000/dev/batch/)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/api/java/org/apache/flink/table/api/Table.html (from 
> 0.0.0.0:4000/dev/table_api.html)
> 15:21:55  Error:  "Not Found " (404) at link 0.0.0.0:4000/dev/state.html 
> (from 0.0.0.0:4000/ops/state_backends.html)
> 15:21:55  Error:  "Not Found " (404) at link 
> 0.0.0.0:4000/internals/state_backends.html (from 
> 0.0.0.0:4000/internals/stream_checkpointing.html)
> {code}
> FYI: command to replay:
> {{httrack http://0.0.0.0:4000/  -O "$PWD/flink-docs" --testlinks  -%v 
> --depth= --ext-depth=0}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5778) Split FileStateHandle into fileName and basePath

2017-02-14 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866466#comment-15866466
 ] 

Stephan Ewen commented on FLINK-5778:
-

I am not sure about this design. State handle immutability is something I would 
really like to preserve.

How about making all FileStateHandles consist of a base path and a file path?
  - When serializing, the SavepointSerializer stores only the file path
  - When deserializing, the new state handles are constructed from the root 
path of the checkpoint, and the name in the savepoint



> Split FileStateHandle into fileName and basePath
> 
>
> Key: FLINK-5778
> URL: https://issues.apache.org/jira/browse/FLINK-5778
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> Store the statePath as a basePath and a fileName and allow to overwrite the 
> basePath. We cannot overwrite the base path as long as the state handle is 
> still in flight and not persisted. Otherwise we risk a resource leak.
> We need this in order to be able to relocate savepoints.
> {code}
> interface RelativeBaseLocationStreamStateHandle {
>void clearBaseLocation();
>void setBaseLocation(String baseLocation);
> }
> {code}
> FileStateHandle should implement this and the SavepointSerializer should 
> forward the calls when a savepoint is stored or loaded, clear before store 
> and set after load.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3311: Let RpcService.scheduleRunnable return a Scheduled...

2017-02-14 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3311

Let RpcService.scheduleRunnable return a ScheduledFuture

This PR is based #3310.

The returned ScheduledFuture instance allows to cancel a scheduled runnable 
and obtain
the delay until the runnable will be executed. Furthermore, it allows to 
wait on the
completion of the runnable.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
generalizeRpcServiceScheduleMethod

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3311


commit 639edf235ffee924e0fba56313cb51f825e3a46d
Author: Till Rohrmann 
Date:   2017-02-14T15:50:43Z

[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.

commit 4c58378372867c43ce4d65fa51d609cbb4a95587
Author: Till Rohrmann 
Date:   2017-02-14T19:40:30Z

[FLINK-5799] [rpc] Let RpcService.scheduleRunnable return a ScheduledFuture

The returned ScheduledFuture instance allows to cancel a scheduled runnable 
and obtain
the delay until the runnable will be executed. Furthermore, it allows to 
wait on the
completion of the runnable.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5798) Let the RPCService provide a ScheduledExecutorService

2017-02-14 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5798:


 Summary: Let the RPCService provide a ScheduledExecutorService
 Key: FLINK-5798
 URL: https://issues.apache.org/jira/browse/FLINK-5798
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.3.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor
 Fix For: 1.3.0


Currently the {{RPCService}} interface provides a {{scheduleRunnable}} method 
to schedule {{Runnables}}. I would like to generalize this functionality by 
letting the {{RPCService}} provide a {{ScheduledExecutorService}} to the user. 
That way other components which require such an executor service could simply 
use the one provided by the {{RPCService}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3310: [FLINK-5798] [rpc] Let the RpcService provide a Sc...

2017-02-14 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3310

[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
rpcAddScheduledExecutorService

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3310


commit 639edf235ffee924e0fba56313cb51f825e3a46d
Author: Till Rohrmann 
Date:   2017-02-14T15:50:43Z

[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5798) Let the RPCService provide a ScheduledExecutorService

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866450#comment-15866450
 ] 

ASF GitHub Bot commented on FLINK-5798:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/3310

[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
rpcAddScheduledExecutorService

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3310


commit 639edf235ffee924e0fba56313cb51f825e3a46d
Author: Till Rohrmann 
Date:   2017-02-14T15:50:43Z

[FLINK-5798] [rpc] Let the RpcService provide a ScheduledExecutorService

This PR adds the getScheduledExecutorService method to the RpcService 
interface. So
henceforth all RpcService implementations have to provide a 
ScheduledExecutorService
implementation.

Currently, we only support the AkkaRpcService. The AkkaRpcService returns a
ScheduledExecutorService proxy which forwards the schedule calls to the 
ActorSystem's
internal scheduler.




> Let the RPCService provide a ScheduledExecutorService
> -
>
> Key: FLINK-5798
> URL: https://issues.apache.org/jira/browse/FLINK-5798
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
> Fix For: 1.3.0
>
>
> Currently the {{RPCService}} interface provides a {{scheduleRunnable}} method 
> to schedule {{Runnables}}. I would like to generalize this functionality by 
> letting the {{RPCService}} provide a {{ScheduledExecutorService}} to the 
> user. That way other components which require such an executor service could 
> simply use the one provided by the {{RPCService}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5253) Remove special treatment of "dynamic properties"

2017-02-14 Thread Mariusz Wojakowski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15866340#comment-15866340
 ] 

Mariusz Wojakowski commented on FLINK-5253:
---

I have 2 questions:
* should we store these dynamic properties in _flink-conf.yaml_ and completely 
remove that information from cluster descriptor? (Correct me if I'm wrong)
* do we want to completely remove that special handling? I’m thinking about 
existing implementation that setup Flink on Yarn (’yarn-sessions’).

> Remove special treatment of "dynamic properties"
> 
>
> Key: FLINK-5253
> URL: https://issues.apache.org/jira/browse/FLINK-5253
> Project: Flink
>  Issue Type: Sub-task
>  Components: YARN
> Environment: {{flip-6}} feature branch
>Reporter: Stephan Ewen
>  Labels: flip-6
>
> The YARN client accepts configuration keys as command line parameters.
> Currently these are send to the AppMaster and TaskManager as "dynamic 
> properties", encoded in a special way via environment variables.
> The mechanism is quite fragile. We should simplify it:
>   - The YARN client takes the local {{flink-conf.yaml}} as the base.
>   - It overwrite config entries with command line properties when preparing 
> the configuration to be shipped to YARN container processes (JM / TM)
>   - No additional handling neccessary



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3300: [FLINK-5690][docs] Add note on shading to best pra...

2017-02-14 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3300#discussion_r100977226
  
--- Diff: docs/dev/best_practices.md ---
@@ -310,3 +310,17 @@ Next, you need to put the following jar files into the 
`lib/` folder:
 Note that you need to explicitly set the `lib/` directory when using a per 
job YARN cluster.
 
 The command to submit Flink on YARN with a custom logger is: `./bin/flink 
run -yt $FLINK_HOME/lib <... remaining arguments ...>`
+
+
+## Resolving Dependency Conflicts with Flink using the maven-shade-plugin.
--- End diff --

I think this is better for the debugging section


https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5690) protobuf is not shaded properly

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865327#comment-15865327
 ] 

ASF GitHub Bot commented on FLINK-5690:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3300#discussion_r100977226
  
--- Diff: docs/dev/best_practices.md ---
@@ -310,3 +310,17 @@ Next, you need to put the following jar files into the 
`lib/` folder:
 Note that you need to explicitly set the `lib/` directory when using a per 
job YARN cluster.
 
 The command to submit Flink on YARN with a custom logger is: `./bin/flink 
run -yt $FLINK_HOME/lib <... remaining arguments ...>`
+
+
+## Resolving Dependency Conflicts with Flink using the maven-shade-plugin.
--- End diff --

I think this is better for the debugging section


https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/debugging_classloading.html


> protobuf is not shaded properly
> ---
>
> Key: FLINK-5690
> URL: https://issues.apache.org/jira/browse/FLINK-5690
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.4, 1.3.0
>Reporter: Andrey
>Assignee: Robert Metzger
>
> Currently distributive contains com/google/protobuf package. Without proper 
> shading client code could fail with:
> {code}
> Caused by: java.lang.IllegalAccessError: tried to access method 
> com.google.protobuf.
> {code}
> Steps to reproduce:
> * create job class "com.google.protobuf.TestClass"
> * call com.google.protobuf.TextFormat.escapeText(String) method from this 
> class
> * deploy job to flink cluster (usign web console for example)
> * run job. In logs IllegalAccessError.
> Issue in package protected method and different classloaders. TestClass 
> loaded by FlinkUserCodeClassLoader, but TextFormat class loaded by 
> sun.misc.Launcher$AppClassLoader



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3298: [FLINK-5672] add special cases for a local setup i...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r100978413
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
+
+if [[ ${all_localhost} -eq 1 ]]; then
+# all-local setup
 for slave in ${SLAVES[@]}; do
-ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+"${FLINK_BIN_DIR}"/taskmanager.sh stop
 done
 else
-PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, 
; echo "${SLAVES[*]}") \
-"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
+command -v pdsh >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+for slave in ${SLAVES[@]}; do
+ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+done
+else
+PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w 
$(IFS=, ; echo "${SLAVES[*]}") \
+"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
--- End diff --

Correct me if I'm wrong, but maybe this part of the code only differs in 
the "start" or "stop" string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5672) Job fails with java.lang.IllegalArgumentException: port out of range:-1

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865340#comment-15865340
 ] 

ASF GitHub Bot commented on FLINK-5672:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r100978305
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
--- End diff --

I think we should put the all localhost check into a method (maybe into the 
config.sh) to avoid duplicate code


> Job fails with java.lang.IllegalArgumentException: port out of range:-1
> ---
>
> Key: FLINK-5672
> URL: https://issues.apache.org/jira/browse/FLINK-5672
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Timo Walther
>
> I started the JobManager with {{start-local.sh}} and started another 
> TaskManager with {{taskmanager.sh start}}. My job is a Table API job with a 
> {{orderBy}} (range partitioning with parallelism 2).
> The job fails with the following exception:
> {code}
> java.lang.IllegalArgumentException: port out of range:-1
>   at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
>   at java.net.InetSocketAddress.(InetSocketAddress.java:188)
>   at 
> org.apache.flink.runtime.io.network.ConnectionID.(ConnectionID.java:47)
>   at 
> org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:124)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:627)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.deployToSlot(Execution.java:358)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:284)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:279)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutor.execute(Executors.java:56)
>   at 
> scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:122)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handleAsync(FlinkFuture.java:256)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handle(FlinkFuture.java:270)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:279)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:479)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:525)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:521)
>   at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3298: [FLINK-5672] add special cases for a local setup i...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r100978305
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
--- End diff --

I think we should put the all localhost check into a method (maybe into the 
config.sh) to avoid duplicate code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5672) Job fails with java.lang.IllegalArgumentException: port out of range:-1

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865342#comment-15865342
 ] 

ASF GitHub Bot commented on FLINK-5672:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r100978413
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
+
+if [[ ${all_localhost} -eq 1 ]]; then
+# all-local setup
 for slave in ${SLAVES[@]}; do
-ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+"${FLINK_BIN_DIR}"/taskmanager.sh stop
 done
 else
-PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, 
; echo "${SLAVES[*]}") \
-"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
+command -v pdsh >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+for slave in ${SLAVES[@]}; do
+ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+done
+else
+PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w 
$(IFS=, ; echo "${SLAVES[*]}") \
+"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
--- End diff --

Correct me if I'm wrong, but maybe this part of the code only differs in 
the "start" or "stop" string.


> Job fails with java.lang.IllegalArgumentException: port out of range:-1
> ---
>
> Key: FLINK-5672
> URL: https://issues.apache.org/jira/browse/FLINK-5672
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Timo Walther
>
> I started the JobManager with {{start-local.sh}} and started another 
> TaskManager with {{taskmanager.sh start}}. My job is a Table API job with a 
> {{orderBy}} (range partitioning with parallelism 2).
> The job fails with the following exception:
> {code}
> java.lang.IllegalArgumentException: port out of range:-1
>   at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
>   at java.net.InetSocketAddress.(InetSocketAddress.java:188)
>   at 
> org.apache.flink.runtime.io.network.ConnectionID.(ConnectionID.java:47)
>   at 
> org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:124)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:627)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.deployToSlot(Execution.java:358)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:284)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:279)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutor.execute(Executors.java:56)
>   at 
> scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:122)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handleAsync(FlinkFuture.java:256)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handle(FlinkFuture.java:270)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:279)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:479)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:525)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:521)
>   at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 

[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865606#comment-15865606
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101012059
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
+is possible on all nodes.
+
+  - A **local file system** must 

[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865609#comment-15865609
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011728
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
--- End diff --

while others do not


> Document assumptions about 

[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865607#comment-15865607
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011563
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
--- End diff --

(such that the file ...)


> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865608#comment-15865608
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101010905
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
--- End diff --

machine's


> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865604#comment-15865604
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011921
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
+is possible on all nodes.
+
+  - A **local file system** must 

[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865603#comment-15865603
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101010804
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
--- End diff --

support


> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5788) Document assumptions about File Systems and persistence

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865605#comment-15865605
 ] 

ASF GitHub Bot commented on FLINK-5788:
---

Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011358
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
--- End diff --

 typically guarantee durability in the presence of at most *n* concurrent 
node failures,


> Document assumptions about File Systems and persistence
> ---
>
> Key: FLINK-5788
> URL: https://issues.apache.org/jira/browse/FLINK-5788
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.3.0
>
>
> We should add some description about the assumptions we make for the behavior 
> of {{FileSystem}} implementations to support proper checkpointing and 
> recovery operations.
> This is especially critical for file systems like {{S3}} with a somewhat 
> tricky contract.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101010905
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
--- End diff --

machine's


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865588#comment-15865588
 ] 

ASF GitHub Bot commented on FLINK-5790:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3305#discussion_r101009073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/ArrayListTypeInfo.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ArrayListSerializer;
+
+import java.util.ArrayList;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API.
+ *
+ * @param  The type of the elements in the list.
+ */
+
+
+@Public
+public final class ArrayListTypeInfo extends 
TypeInformation {
+
+   private final TypeInformation elementTypeInfo;
+
+   public ArrayListTypeInfo(Class elementTypeClass) {
+   this.elementTypeInfo = 
TypeExtractor.createTypeInfo(elementTypeClass);
+   }
+
+   public ArrayListTypeInfo(TypeInformation elementTypeInfo) {
+   this.elementTypeInfo = elementTypeInfo;
+   }
+
+   public TypeInformation getElementTypeInfo() {
+   return elementTypeInfo;
+   }
+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   return elementTypeInfo.getTotalFields();
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class getTypeClass() {
+   return (Class)(new ArrayList().getClass());
--- End diff --

You can simplify this (to avoid creating an `ArrayList` instance) via:
`return (Class)(Class) ArrayList.class;`


> Use list types when ListStateDescriptor extends StateDescriptor
> ---
>
> Key: FLINK-5790
> URL: https://issues.apache.org/jira/browse/FLINK-5790
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Flink keeps the state serializer in {{StateDescriptor}}, but it's the 
> serializer of list elements  that is put in {{ListStateDescriptor}}. The 
> implementation is a little confusing. Some backends need to construct the 
> state serializer with the element serializer by themselves.
> We should use an {{ArrayListSerializer}}, which is composed of the serializer 
> of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid 
> constructing the state serializer.
> If a backend needs customized serialization of the state (e.g. 
> {{RocksDBStateBackend}}), it still can obtain the element serializer from the 
> {{ArrayListSerializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5790) Use list types when ListStateDescriptor extends StateDescriptor

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865587#comment-15865587
 ] 

ASF GitHub Bot commented on FLINK-5790:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3305#discussion_r101008561
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ArrayListSerializer.java
 ---
@@ -108,9 +112,13 @@ public void copy(DataInputView source, DataOutputView 
target) throws IOException
 
@Override
public boolean equals(Object obj) {
+   if (obj.getClass() != getClass())  {
+   System.out.println("other " + obj.getClass().getName() 
+ ", this " + getClass().getName());
--- End diff --

This is probably left over code that should be removed.


> Use list types when ListStateDescriptor extends StateDescriptor
> ---
>
> Key: FLINK-5790
> URL: https://issues.apache.org/jira/browse/FLINK-5790
> Project: Flink
>  Issue Type: Improvement
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Flink keeps the state serializer in {{StateDescriptor}}, but it's the 
> serializer of list elements  that is put in {{ListStateDescriptor}}. The 
> implementation is a little confusing. Some backends need to construct the 
> state serializer with the element serializer by themselves.
> We should use an {{ArrayListSerializer}}, which is composed of the serializer 
> of the element, in the {{ListStateDescriptor}}. It helps the backend to avoid 
> constructing the state serializer.
> If a backend needs customized serialization of the state (e.g. 
> {{RocksDBStateBackend}}), it still can obtain the element serializer from the 
> {{ArrayListSerializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3293: [FLINK-5745] set an uncaught exception handler for netty ...

2017-02-14 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3293
  
@StephanEwen already did when #3290 got in


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865641#comment-15865641
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101016955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -306,6 +307,85 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+* for aggregates.
+* The function returns aggregate values of all aggregate function 
which are
+* organized by the following format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
+*   |  |  
windowEnd(max(rowtime)
+*   |  |   |
+*   v  v   v
+*+++++---+-+
+*|  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
+*+++++---+-+
+*   ^ ^
+*   | |
+* sum(y) aggOffsetInRow = 4windowStart(min(rowtime))
+*
+* }}}
+*
+*/
+  def createDataSetWindowAggregationMapPartitionFunction(
+window: LogicalWindow,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType = null,
+properties: Seq[NamedWindowProperty] = null,
+isPreMapPartition: Boolean = true,
+isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  0)._2
+
+val intermediateRowArity = 
aggregates.map(_.intermediateDataType.length).sum
+
+window match {
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+if (isPreMapPartition) {
+  val preMapReturnType: RowTypeInfo =
+createAggregateBufferDataType(
+  Array(),
+  aggregates,
+  inputType,
+  Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)))
+
+  new DataSetSessionWindowAggregatePreProcessor(
+aggregates,
+Array(),
+// the addition two fields are used to store window-start and 
window-end attributes
+intermediateRowArity + 2,
+asLong(gap),
+preMapReturnType)
+
+} else {
+  val (startPos, endPos) = 
computeWindowStartEndPropertyPos(properties)
--- End diff --

We do not need this case if we compute the final aggregates with a 
`GroupReduceFunction`.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865636#comment-15865636
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101016814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -306,6 +307,85 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+* for aggregates.
+* The function returns aggregate values of all aggregate function 
which are
+* organized by the following format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
+*   |  |  
windowEnd(max(rowtime)
+*   |  |   |
+*   v  v   v
+*+++++---+-+
+*|  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
+*+++++---+-+
+*   ^ ^
+*   | |
+* sum(y) aggOffsetInRow = 4windowStart(min(rowtime))
+*
+* }}}
+*
+*/
+  def createDataSetWindowAggregationMapPartitionFunction(
+window: LogicalWindow,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType = null,
+properties: Seq[NamedWindowProperty] = null,
+isPreMapPartition: Boolean = true,
+isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  0)._2
+
+val intermediateRowArity = 
aggregates.map(_.intermediateDataType.length).sum
+
+window match {
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+if (isPreMapPartition) {
+  val preMapReturnType: RowTypeInfo =
+createAggregateBufferDataType(
+  Array(),
+  aggregates,
+  inputType,
+  Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)))
+
+  new DataSetSessionWindowAggregatePreProcessor(
+aggregates,
+Array(),
+// the addition two fields are used to store window-start and 
window-end attributes
--- End diff --

the addition**al** two fields


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865639#comment-15865639
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101015171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala
 ---
@@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
 finalRowWindowEndPos: Option[Int],
 gap:Long,
 isInputCombined: Boolean)
-  extends RichGroupReduceFunction[Row, Row] {
+  extends AbstractRichFunction
+  with MapPartitionFunction[Row, Row]
--- End diff --

I like the idea of implementing a joint `MapPartition` and `GroupReduce` 
function. However, I think it is not necessary for the final aggregation. We 
can also call `DataSet.reduceGroup()` which will do the same as 
`DataSet.mapPartition().parallelism(1)`.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865634#comment-15865634
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101013750
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala
 ---
@@ -19,15 +19,17 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.functions.{AbstractRichFunction, 
GroupReduceFunction,
+MapPartitionFunction}
--- End diff --

do not break the line


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865637#comment-15865637
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -281,9 +279,36 @@ class DataSetWindowAggregate(
   .returns(rowTypeInfo)
   .name(aggregateOperatorName)
   .asInstanceOf[DataSet[Any]]
+
+  } else {
+// non-grouping window
+val preMapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType)
+
+val mapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType,
+  rowRelDataType,
+  namedProperties,
+  isPreMapPartition = false,
+  isInputCombined = true)
+
+mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING)
+  .mapPartition(preMapPartitionFunction)
+  .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1)
+  .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1)
+  .mapPartition(mapPartitionFunction).setParallelism(1)
--- End diff --

I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. 
Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 
1.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865640#comment-15865640
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014879
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -294,18 +319,29 @@ class DataSetWindowAggregate(
   namedProperties)
 
 mappedInput.groupBy(groupingKeys: _*)
-.sortGroup(rowTimeFieldPos, Order.ASCENDING)
-.reduceGroup(groupReduceFunction)
-.returns(rowTypeInfo)
-.name(aggregateOperatorName)
-.asInstanceOf[DataSet[Any]]
+  .sortGroup(rowTimeFieldPos, Order.ASCENDING)
+  .reduceGroup(groupReduceFunction)
+  .returns(rowTypeInfo)
+  .name(aggregateOperatorName)
+  .asInstanceOf[DataSet[Any]]
+
+  } else {
+// non-grouping window
+val mapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType,
+  rowRelDataType,
+  namedProperties,
+  isPreMapPartition = false)
+
+mappedInput.sortPartition(rowTimeFieldPos, 
Order.ASCENDING).setParallelism(1)
+  .mapPartition(mapPartitionFunction).setParallelism(1)
--- End diff --

I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. 
Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 
1.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865638#comment-15865638
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014280
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: 
TableConfigMode)
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test
   def testAlldEventTimeSessionGroupWindow(): Unit = {
-// Non-grouping Session window on event-time are currently not 
supported
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
 val windowedTable =table
   .window(Session withGap 7.milli on 'long as 'w)
   .groupBy('w)
-  .select('string.count).toDataSet[Row].collect()
+  .select('string.count)
+val results = windowedTable.toDataSet[Row].collect()
+val expected = "6\n1"
--- End diff --

Would be good to change the query in a way that it checks that multiple 
sessions are correctly computed.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5219) Add non-grouped session windows for batch tables

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865635#comment-15865635
 ] 

ASF GitHub Bot commented on FLINK-5219:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014526
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
 ---
@@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.functions.{AbstractRichFunction, 
GroupCombineFunction,
+MapPartitionFunction}
--- End diff --

No line break. Imports may exceed the 100 char limit.


> Add non-grouped session windows for batch tables
> 
>
> Key: FLINK-5219
> URL: https://issues.apache.org/jira/browse/FLINK-5219
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Add non-grouped session windows for batch tables as described in 
> [FLIP-11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations].



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101016955
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -306,6 +307,85 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+* for aggregates.
+* The function returns aggregate values of all aggregate function 
which are
+* organized by the following format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
+*   |  |  
windowEnd(max(rowtime)
+*   |  |   |
+*   v  v   v
+*+++++---+-+
+*|  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
+*+++++---+-+
+*   ^ ^
+*   | |
+* sum(y) aggOffsetInRow = 4windowStart(min(rowtime))
+*
+* }}}
+*
+*/
+  def createDataSetWindowAggregationMapPartitionFunction(
+window: LogicalWindow,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType = null,
+properties: Seq[NamedWindowProperty] = null,
+isPreMapPartition: Boolean = true,
+isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  0)._2
+
+val intermediateRowArity = 
aggregates.map(_.intermediateDataType.length).sum
+
+window match {
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+if (isPreMapPartition) {
+  val preMapReturnType: RowTypeInfo =
+createAggregateBufferDataType(
+  Array(),
+  aggregates,
+  inputType,
+  Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)))
+
+  new DataSetSessionWindowAggregatePreProcessor(
+aggregates,
+Array(),
+// the addition two fields are used to store window-start and 
window-end attributes
+intermediateRowArity + 2,
+asLong(gap),
+preMapReturnType)
+
+} else {
+  val (startPos, endPos) = 
computeWindowStartEndPropertyPos(properties)
--- End diff --

We do not need this case if we compute the final aggregates with a 
`GroupReduceFunction`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -281,9 +279,36 @@ class DataSetWindowAggregate(
   .returns(rowTypeInfo)
   .name(aggregateOperatorName)
   .asInstanceOf[DataSet[Any]]
+
+  } else {
+// non-grouping window
+val preMapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType)
+
+val mapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType,
+  rowRelDataType,
+  namedProperties,
+  isPreMapPartition = false,
+  isInputCombined = true)
+
+mappedInput.sortPartition(rowTimeFieldPos, Order.ASCENDING)
+  .mapPartition(preMapPartitionFunction)
+  .sortPartition(windowStartPos, Order.ASCENDING).setParallelism(1)
+  .sortPartition(windowEndPos, Order.ASCENDING).setParallelism(1)
+  .mapPartition(mapPartitionFunction).setParallelism(1)
--- End diff --

I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. 
Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 
1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014280
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetWindowAggregateITCase.scala
 ---
@@ -141,16 +141,18 @@ class DataSetWindowAggregateITCase(configMode: 
TableConfigMode)
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[UnsupportedOperationException])
+  @Test
   def testAlldEventTimeSessionGroupWindow(): Unit = {
-// Non-grouping Session window on event-time are currently not 
supported
 val env = ExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env, config)
 val table = env.fromCollection(data).toTable(tEnv, 'long, 'int, 
'string)
 val windowedTable =table
   .window(Session withGap 7.milli on 'long as 'w)
   .groupBy('w)
-  .select('string.count).toDataSet[Row].collect()
+  .select('string.count)
+val results = windowedTable.toDataSet[Row].collect()
+val expected = "6\n1"
--- End diff --

Would be good to change the query in a way that it checks that multiple 
sessions are correctly computed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014879
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 ---
@@ -294,18 +319,29 @@ class DataSetWindowAggregate(
   namedProperties)
 
 mappedInput.groupBy(groupingKeys: _*)
-.sortGroup(rowTimeFieldPos, Order.ASCENDING)
-.reduceGroup(groupReduceFunction)
-.returns(rowTypeInfo)
-.name(aggregateOperatorName)
-.asInstanceOf[DataSet[Any]]
+  .sortGroup(rowTimeFieldPos, Order.ASCENDING)
+  .reduceGroup(groupReduceFunction)
+  .returns(rowTypeInfo)
+  .name(aggregateOperatorName)
+  .asInstanceOf[DataSet[Any]]
+
+  } else {
+// non-grouping window
+val mapPartitionFunction = 
createDataSetWindowAggregationMapPartitionFunction(
+  window,
+  namedAggregates,
+  inputType,
+  rowRelDataType,
+  namedProperties,
+  isPreMapPartition = false)
+
+mappedInput.sortPartition(rowTimeFieldPos, 
Order.ASCENDING).setParallelism(1)
+  .mapPartition(mapPartitionFunction).setParallelism(1)
--- End diff --

I think we can also use `.reduceGroup()` and a `GroupReduceFunction` here. 
Without `groupBy`, the `GroupReduceFunction` will be executed with parallelism 
1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101016814
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ---
@@ -306,6 +307,85 @@ object AggregateUtil {
   }
 
   /**
+* Create a 
[[org.apache.flink.api.common.functions.MapPartitionFunction]] that aggregation
+* for aggregates.
+* The function returns aggregate values of all aggregate function 
which are
+* organized by the following format:
+*
+* {{{
+*   avg(x) aggOffsetInRow = 2  count(z) aggOffsetInRow = 5
+*   |  |  
windowEnd(max(rowtime)
+*   |  |   |
+*   v  v   v
+*+++++---+-+
+*|  sum1  | count1 |  sum2  | count2 |windowStart|windowEnd|
+*+++++---+-+
+*   ^ ^
+*   | |
+* sum(y) aggOffsetInRow = 4windowStart(min(rowtime))
+*
+* }}}
+*
+*/
+  def createDataSetWindowAggregationMapPartitionFunction(
+window: LogicalWindow,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+inputType: RelDataType,
+outputType: RelDataType = null,
+properties: Seq[NamedWindowProperty] = null,
+isPreMapPartition: Boolean = true,
+isInputCombined: Boolean = false): MapPartitionFunction[Row, Row] = {
+
+val aggregates = transformToAggregateFunctions(
+  namedAggregates.map(_.getKey),
+  inputType,
+  0)._2
+
+val intermediateRowArity = 
aggregates.map(_.intermediateDataType.length).sum
+
+window match {
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+if (isPreMapPartition) {
+  val preMapReturnType: RowTypeInfo =
+createAggregateBufferDataType(
+  Array(),
+  aggregates,
+  inputType,
+  Option(Array(BasicTypeInfo.LONG_TYPE_INFO, 
BasicTypeInfo.LONG_TYPE_INFO)))
+
+  new DataSetSessionWindowAggregatePreProcessor(
+aggregates,
+Array(),
+// the addition two fields are used to store window-start and 
window-end attributes
--- End diff --

the addition**al** two fields


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101015171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala
 ---
@@ -59,7 +61,9 @@ class DataSetSessionWindowAggregateReduceGroupFunction(
 finalRowWindowEndPos: Option[Int],
 gap:Long,
 isInputCombined: Boolean)
-  extends RichGroupReduceFunction[Row, Row] {
+  extends AbstractRichFunction
+  with MapPartitionFunction[Row, Row]
--- End diff --

I like the idea of implementing a joint `MapPartition` and `GroupReduce` 
function. However, I think it is not necessary for the final aggregation. We 
can also call `DataSet.reduceGroup()` which will do the same as 
`DataSet.mapPartition().parallelism(1)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101014526
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala
 ---
@@ -19,30 +19,36 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.common.functions.RichGroupCombineFunction
+import org.apache.flink.api.common.functions.{AbstractRichFunction, 
GroupCombineFunction,
+MapPartitionFunction}
--- End diff --

No line break. Imports may exceed the 100 char limit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3307: [FLINK-5420] Make CEP operators rescalable

2017-02-14 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3307

[FLINK-5420] Make CEP operators rescalable

Transforms the CEP operators into ProcessFunctions that use only managed 
keyed state. Rescalability now comes out-of-the-box. In addition, for the keyed 
operator, the list of seen keys is replaced by timers set by the incoming 
elements that will fire at the next watermark.

More information about how rescalability is achieved can be found here:
https://issues.apache.org/jira/browse/FLINK-5420

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink cep-ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3307.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3307


commit 230ee80355f7219d56d1010d8850dbe39866c7e8
Author: kl0u 
Date:   2017-01-27T14:30:32Z

[FLINK-5420] Make CEP operators rescalable

Transforms the CEP operators into ProcessFunctions
that use only managed keyed state. Rescalability
now comes out-of-the-box. In addition, for the
keyed operator, the list of seen keys is replaced
by timers set by the incoming elements that will
fire at the next watermark.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5420) Make CEP operators rescalable

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865674#comment-15865674
 ] 

ASF GitHub Bot commented on FLINK-5420:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/3307

[FLINK-5420] Make CEP operators rescalable

Transforms the CEP operators into ProcessFunctions that use only managed 
keyed state. Rescalability now comes out-of-the-box. In addition, for the keyed 
operator, the list of seen keys is replaced by timers set by the incoming 
elements that will fire at the next watermark.

More information about how rescalability is achieved can be found here:
https://issues.apache.org/jira/browse/FLINK-5420

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink cep-ref

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3307.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3307


commit 230ee80355f7219d56d1010d8850dbe39866c7e8
Author: kl0u 
Date:   2017-01-27T14:30:32Z

[FLINK-5420] Make CEP operators rescalable

Transforms the CEP operators into ProcessFunctions
that use only managed keyed state. Rescalability
now comes out-of-the-box. In addition, for the
keyed operator, the list of seen keys is replaced
by timers set by the incoming elements that will
fire at the next watermark.




> Make CEP operators rescalable
> -
>
> Key: FLINK-5420
> URL: https://issues.apache.org/jira/browse/FLINK-5420
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> This issue targets making the operators in the CEP library re-scalable. After 
> this is implemented, the user will be able to take a savepoint and restart 
> his job with a different parallelism.
> The way to do it is to transform the CEP operators into the newly introduced 
> {{ProcessFunction}} and use only managed keyed state to store their state. 
> With this transformation, rescalability will come out-of-the-box. In 
> addition, for the keyed operator and for event time, we will not have to keep 
> the already seen keys in a list, but we can replace them with timers set for 
> each incoming element (in the {{ProcessFunction#processElement()}}) and made 
> to fire at the next watermark (their timestamp will be the that of the 
> element itself). These timers will be set to fire at the next watermark and 
> when they fire, they will register another timer for the next watermark (in 
> the {{ProcessFunction#onTimer()}} they will re-register themselves with a 
> timestamp equal to {{currentWatermark() + 1}}). This will preserve the 
> previous behavior of the operators.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3293: [FLINK-5745] set an uncaught exception handler for netty ...

2017-02-14 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/3293
  
This looks good to me now. Nico moved the `FatalExitExceptionHandler` out 
of `ExecutorThreadFactory`. I'm going to merge this later today if @StephanEwen 
has no objections.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-5133) Support to set resource for operator in DataStream and DataSet

2017-02-14 Thread Zhijiang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang Wang updated FLINK-5133:
-
Summary: Support to set resource for operator in DataStream and DataSet  
(was: Add new setResource API for DataStream and DataSet)

> Support to set resource for operator in DataStream and DataSet
> --
>
> Key: FLINK-5133
> URL: https://issues.apache.org/jira/browse/FLINK-5133
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataSet API, DataStream API
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> This is part of the fine-grained resource configuration.
> For *DataStream*, the *setResource* API will be setted onto 
> *SingleOutputStreamOperator* similar with other existing properties like 
> parallelism, name, etc.
> For *DataSet*, the *setResource* API will be setted onto *Operator* in the 
> similar way.
> There are two parameters described with minimum *ResourceSpec* and maximum 
> *ResourceSpec* separately in the API for considering resource resize in 
> future improvements.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865497#comment-15865497
 ] 

ASF GitHub Bot commented on FLINK-5624:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100991050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -225,12 +231,29 @@ abstract class StreamTableEnvironment(
 // decorrelate
 val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
+val prePlanner = createHepPlanner
--- End diff --

I'll merge PR #3101 later today which adds a normalization phase before 
optimization by adding a HepPlanner.
Could you integrate you changes with #3101?


> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5792) Improve “UDTF" to support with parameter constructor

2017-02-14 Thread sunjincheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-5792:
---
Description: Currently UDTF in the codegen phase using a nonparametric 
constructor to create the instance, causing the user can not include the state 
value in the UDTF. The UDTF's codegen phase can use a serialized mechanism so 
that the UDTF can contain state values.  (was: Improved “UDTF" to support with 
parameter constructor. )

> Improve “UDTF" to support with parameter constructor
> 
>
> Key: FLINK-5792
> URL: https://issues.apache.org/jira/browse/FLINK-5792
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> Currently UDTF in the codegen phase using a nonparametric constructor to 
> create the instance, causing the user can not include the state value in the 
> UDTF. The UDTF's codegen phase can use a serialized mechanism so that the 
> UDTF can contain state values.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3306: [FLINK-5793] [runtime] fix running slot may not be add to...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3306
  
Good fix, thanks.
Could you add a quick unit test for the bug? That way we can make sure the 
bug is not accidentally re-introduced by another patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5793) Running slot may not be add to AllocatedMap in SlotPool

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865545#comment-15865545
 ] 

ASF GitHub Bot commented on FLINK-5793:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3306
  
Good fix, thanks.
Could you add a quick unit test for the bug? That way we can make sure the 
bug is not accidentally re-introduced by another patch.


> Running slot may not be add to AllocatedMap in SlotPool
> ---
>
> Key: FLINK-5793
> URL: https://issues.apache.org/jira/browse/FLINK-5793
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> In SlotPool, when a slot is returned by a finished task, it will try to find 
> a pending request mataching it. If found, will give the slot to the request, 
> butnot add the slot to AllocatedMap.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5553) Job fails during deployment with IllegalStateException from subpartition request

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865563#comment-15865563
 ] 

ASF GitHub Bot commented on FLINK-5553:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3299
  
Good fix, +1

Merging this...


> Job fails during deployment with IllegalStateException from subpartition 
> request
> 
>
> Key: FLINK-5553
> URL: https://issues.apache.org/jira/browse/FLINK-5553
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.3.0
>Reporter: Robert Metzger
>Assignee: Nico Kruber
> Attachments: application-1484132267957-0076
>
>
> While running a test job with Flink 1.3-SNAPSHOT 
> (6fb6967b9f9a31f034bd09fcf76aaf147bc8e9a0) the job failed with this exception:
> {code}
> 2017-01-18 14:56:27,043 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
> (9/10) (befc06d0e792c2ce39dde74b365dd3cf) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,059 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (9/10) (e94a01ec283e5dce7f79b02cf51654c4) switched from DEPLOYING to RUNNING.
> 2017-01-18 14:56:27,817 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Flat Map 
> (10/10) (cbb61c9a2f72c282877eb383e111f7cd) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> 2017-01-18 14:56:27,819 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job 
> Misbehaved Job (b1d985d11984df57400fdff2bb656c59) switched from state RUNNING 
> to FAILING.
> java.lang.IllegalStateException: There has been an error in the channel.
> at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:77)
> at 
> org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:115)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:419)
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:441)
> at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:192)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:270)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:666)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> This is the first exception that is reported to the jobmanager.
> I think this is related to missing network buffers. You see that from the 
> next deployment after the restart, where the deployment fails with the 
> insufficient number of buffers exception.
> I'll add logs to the JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3299: [FLINK-5553] keep the original throwable in PartitionRequ...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3299
  
Good fix, +1

Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3293: [FLINK-5745] set an uncaught exception handler for netty ...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3293
  
@NicoK I think you can update this now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5672) Job fails with java.lang.IllegalArgumentException: port out of range:-1

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865583#comment-15865583
 ] 

ASF GitHub Bot commented on FLINK-5672:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r101008339
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
+
+if [[ ${all_localhost} -eq 1 ]]; then
+# all-local setup
 for slave in ${SLAVES[@]}; do
-ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+"${FLINK_BIN_DIR}"/taskmanager.sh stop
 done
 else
-PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, 
; echo "${SLAVES[*]}") \
-"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
+command -v pdsh >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+for slave in ${SLAVES[@]}; do
+ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+done
+else
+PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w 
$(IFS=, ; echo "${SLAVES[*]}") \
+"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
--- End diff --

you're right but it's probably more readable to keep the commands in there 
instead of sharing them, also it has been that way before ;)


> Job fails with java.lang.IllegalArgumentException: port out of range:-1
> ---
>
> Key: FLINK-5672
> URL: https://issues.apache.org/jira/browse/FLINK-5672
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Timo Walther
>
> I started the JobManager with {{start-local.sh}} and started another 
> TaskManager with {{taskmanager.sh start}}. My job is a Table API job with a 
> {{orderBy}} (range partitioning with parallelism 2).
> The job fails with the following exception:
> {code}
> java.lang.IllegalArgumentException: port out of range:-1
>   at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
>   at java.net.InetSocketAddress.(InetSocketAddress.java:188)
>   at 
> org.apache.flink.runtime.io.network.ConnectionID.(ConnectionID.java:47)
>   at 
> org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:124)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:627)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.deployToSlot(Execution.java:358)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:284)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:279)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutor.execute(Executors.java:56)
>   at 
> scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:122)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handleAsync(FlinkFuture.java:256)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handle(FlinkFuture.java:270)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:279)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:479)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:525)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:521)
>   at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>   at 
> 

[GitHub] flink pull request #3298: [FLINK-5672] add special cases for a local setup i...

2017-02-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r101008195
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
--- End diff --

makes sense - also since there we already go through the slaves once so we 
can do this check on the fly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3305#discussion_r101008561
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ArrayListSerializer.java
 ---
@@ -108,9 +112,13 @@ public void copy(DataInputView source, DataOutputView 
target) throws IOException
 
@Override
public boolean equals(Object obj) {
+   if (obj.getClass() != getClass())  {
+   System.out.println("other " + obj.getClass().getName() 
+ ", this " + getClass().getName());
--- End diff --

This is probably left over code that should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3305: [FLINK-5790][StateBackend] Use list types when Lis...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/3305#discussion_r101009073
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/ArrayListTypeInfo.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.ArrayListSerializer;
+
+import java.util.ArrayList;
+
+/**
+ * A {@link TypeInformation} for the list types of the Java API.
+ *
+ * @param  The type of the elements in the list.
+ */
+
+
+@Public
+public final class ArrayListTypeInfo extends 
TypeInformation {
+
+   private final TypeInformation elementTypeInfo;
+
+   public ArrayListTypeInfo(Class elementTypeClass) {
+   this.elementTypeInfo = 
TypeExtractor.createTypeInfo(elementTypeClass);
+   }
+
+   public ArrayListTypeInfo(TypeInformation elementTypeInfo) {
+   this.elementTypeInfo = elementTypeInfo;
+   }
+
+   public TypeInformation getElementTypeInfo() {
+   return elementTypeInfo;
+   }
+
+   @Override
+   public boolean isBasicType() {
+   return false;
+   }
+
+   @Override
+   public boolean isTupleType() {
+   return false;
+   }
+
+   @Override
+   public int getArity() {
+   return 0;
+   }
+
+   @Override
+   public int getTotalFields() {
+   return elementTypeInfo.getTotalFields();
+   }
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public Class getTypeClass() {
+   return (Class)(new ArrayList().getClass());
--- End diff --

You can simplify this (to avoid creating an `ArrayList` instance) via:
`return (Class)(Class) ArrayList.class;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011358
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
--- End diff --

 typically guarantee durability in the presence of at most *n* concurrent 
node failures,


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011728
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
--- End diff --

while others do not


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at 

[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101012059
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
+is possible on all nodes.
+
+  - A **local file system** must support the POSIX *close-to-open* 
semantics.
+Because the local file system does not have any fault tolerance 
guarantees, no further
+requirements exist.
+ 
+The above implies specifically that data may still 

[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101010804
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
--- End diff --

support


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011921
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
+listing the directory contents) are not required to be complete for the 
data in the file stream
+to be considered persistent. This relaxation is important for file systems 
where updates to
+directory contents are only eventually consistent.
+
+The `FSDataOutputStream` has to guarantee data persistence for the written 
bytes once the call to
+`FSDataOutputStream.close()` returns.
+
+## Examples
+ 
+  - For **fault-tolerant distributed file systems**, data is considered 
persistent once 
+it has been received and acknowledged by the file system, typically by 
having been replicated
+to a quorum of machines (*durability requirement*). In addition the 
absolute file path
+must be visible to all other machines that will potentially access the 
file (*visibility requirement*).
+
+Whether data has hit non-volatile storage on the storage nodes depends 
on the specific
+guarantees of the particular file system.
+
+The metadata updates to the file's parent directory are not required 
to have reached
+a consistent state. It is permissible that some machines see the file 
when listing the parent
+directory's contents while other do not, as long as access to the file 
by its absolute path
+is possible on all nodes.
+
+  - A **local file system** must support the POSIX *close-to-open* 
semantics.
+Because the local file system does not have any fault tolerance 
guarantees, no further
+requirements exist.
+ 
+The above implies specifically that data may still 

[GitHub] flink pull request #3301: [FLINK-5788] [docs] Improve documentation of FileS...

2017-02-14 Thread alpinegizmo
Github user alpinegizmo commented on a diff in the pull request:

https://github.com/apache/flink/pull/3301#discussion_r101011563
  
--- Diff: docs/internals/filesystems.md ---
@@ -0,0 +1,138 @@
+---
+title: "File Systems"
+nav-parent_id: internals
+nav-pos: 10
+---
+
+
+* Replaced by the TOC
+{:toc}
+
+Flink has its own file system abstraction via the 
`org.apache.flink.core.fs.FileSystem` class.
+This abstraction provides a common set of operations and minimal 
guarantees across various types
+of file system implementations.
+
+The `FileSystem`'s set of available operations is quite limited, in order 
to suport a wide
+range of file systems. For example, appending to or mutating existing 
files is not supported.
+
+File systems are identified by a *file system scheme*, such as `file://`, 
`hdfs://`, etc.
+
+# Implementations
+
+Flink implements the file systems directly, with the following file system 
schemes:
+
+  - `file`, which represents the machines local file system.
+
+Other file system types are accessed by an implementation that bridges to 
the suite of file systems supported by
+[Apache Hadoop](https://hadoop.apache.org/). The following is an 
incomplete list of examples:
+
+  - `hdfs`: Hadoop Distributed File System
+  - `s3`, `s3n`, and `s3a`: Amazon S3 file system
+  - `gcs`: Google Cloud Storage
+  - `maprfs`: The MapR distributed file system
+  - ...
+
+Flink loads Hadoop's file systems transparently if it finds the Hadoop 
File System classes in the class path and finds a valid
+Hadoop configuration. By default, it looks for the Hadoop configuration in 
the class path. Alternatively, one can specify a
+custom location via the configuration entry `fs.hdfs.hadoopconf`.
+
+
+# Persistence Guarantees
+
+These `FileSystem` and its `FsDataOutputStream` instances are used to 
persistently store data, both for results of applications
+and for fault tolerance and recovery. It is therefore crucial that the 
persistence semantics of these streams are well defined.
+
+## Definition of Persistence Guarantees
+
+Data written to an output stream is considered persistent, if two 
requirements are met:
+
+  1. **Visibility Requirement:** It must be guaranteed that all other 
processes, machines,
+ virtual machines, containers, etc. that are able to access the file 
see the data consistently
+ when given the absolute file path. This requirement is similar to the 
*close-to-open*
+ semantics defined by POSIX, but restricted to the file itself (by its 
absolute path).
+
+  2. **Durability Requirement:** The file system's specific 
durability/persistence requirements
+ must be met. These are specific to the particular file system. For 
example the
+ {@link LocalFileSystem} does not provide any durability guarantees 
for crashes of both
+ hardware and operating system, while replicated distributed file 
systems (like HDFS)
+ guarantee typically durability in the presence of up to concurrent 
failure or *n*
+ nodes, where *n* is the replication factor.
+
+Updates to the file's parent directory (such as that the file shows up when
--- End diff --

(such that the file ...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5745) Set uncaught exception handler for Netty threads

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865612#comment-15865612
 ] 

ASF GitHub Bot commented on FLINK-5745:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/3293
  
@StephanEwen already did when #3290 got in


> Set uncaught exception handler for Netty threads
> 
>
> Key: FLINK-5745
> URL: https://issues.apache.org/jira/browse/FLINK-5745
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Priority: Minor
>
> We pass a thread factory for the Netty event loop threads (see 
> {{NettyServer}} and {{NettyClient}}), but don't set an uncaught exception 
> handler. Let's add a JVM terminating handler that exits the process in cause 
> of fatal errors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3266: [FLINK-5219][TableAPI] Add non-grouped session...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3266#discussion_r101013750
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregateProcessor.scala
 ---
@@ -19,15 +19,17 @@ package org.apache.flink.table.runtime.aggregate
 
 import java.lang.Iterable
 
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
+import org.apache.flink.api.common.functions.{AbstractRichFunction, 
GroupReduceFunction,
+MapPartitionFunction}
--- End diff --

do not break the line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5745) Set uncaught exception handler for Netty threads

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865717#comment-15865717
 ] 

ASF GitHub Bot commented on FLINK-5745:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/3293
  
This looks good to me now. Nico moved the `FatalExitExceptionHandler` out 
of `ExecutorThreadFactory`. I'm going to merge this later today if @StephanEwen 
has no objections.



> Set uncaught exception handler for Netty threads
> 
>
> Key: FLINK-5745
> URL: https://issues.apache.org/jira/browse/FLINK-5745
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Ufuk Celebi
>Priority: Minor
>
> We pass a thread factory for the Netty event loop threads (see 
> {{NettyServer}} and {{NettyClient}}), but don't set an uncaught exception 
> handler. Let's add a JVM terminating handler that exits the process in cause 
> of fatal errors.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (FLINK-5420) Make CEP operators rescalable

2017-02-14 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-5420:
--
Summary: Make CEP operators rescalable  (was: Make keyed CEP operator 
rescalable)

> Make CEP operators rescalable
> -
>
> Key: FLINK-5420
> URL: https://issues.apache.org/jira/browse/FLINK-5420
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> {{AbstractKeyedCEPPatternOperator}} uses the old state snapshotting 
> interfaces that make the operator non-resealable. The operator uses state to 
> keep track of all the in-flight keys. When a watermark arrives the buffers 
> are checked for all keys and those that can be processed are processed. We 
> could change this to use the abstract timer system instead.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (FLINK-4651) Re-register processing time timers at the WindowOperator upon recovery.

2017-02-14 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-4651.
-
Resolution: Fixed

> Re-register processing time timers at the WindowOperator upon recovery.
> ---
>
> Key: FLINK-4651
> URL: https://issues.apache.org/jira/browse/FLINK-4651
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>  Labels: windows
> Fix For: 1.1.5, 1.2.0
>
>
> Currently the {{WindowOperator}} checkpoints the processing time timers, but 
> upon recovery it does not re-registers them with the {{TimeServiceProvider}}. 
> To actually reprocess them it relies on another element that will come and 
> register a new timer for a future point in time. Although this is a realistic 
> assumption in long running jobs, we can remove this assumption by 
> re-registering the restored timers with the {{TimeServiceProvider}} in the 
> {{open()}} method of the {{WindowOperator}}.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5566) Introduce structure to hold table and column level statistics

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865393#comment-15865393
 ] 

ASF GitHub Bot commented on FLINK-5566:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3196#discussion_r100984668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+/**
+  * column statistics
+  *
+  * @param ndv   number of distinct values
+  * @param nullCount number of nulls
+  * @param avgLenaverage length of column values
+  * @param maxLenmax length of column values
+  * @param max   max value of column values
+  * @param min   min value of column values
+  */
+case class ColumnStats(
+ndv: Long,
+nullCount: Long,
+avgLen: Long,
+maxLen: Long,
+max: Option[Any],
+min: Option[Any]) {
--- End diff --

Yes, I think you are right @beyond1920 


> Introduce structure to hold table and column level statistics
> -
>
> Key: FLINK-5566
> URL: https://issues.apache.org/jira/browse/FLINK-5566
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>
> We define two structure mode to hold statistics
> 1. TableStats: contain stats for table level, now only one element: rowCount
> 2. ColumnStats: contain stats of column level. 
> for numeric column type: including ndv, nullCount, max, min, histogram
> for string type: including ndv, nullCount, avgLen,maxLen
> for boolean:including ndv, nullCount, trueCount, falseCount
> for date/time/timestamp:  including ndv, nullCount, max, min, histogram 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3196: [FLINK-5566] [Table API & SQL]Introduce structure ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3196#discussion_r100984668
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/ColumnStats.scala
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.stats
+
+/**
+  * column statistics
+  *
+  * @param ndv   number of distinct values
+  * @param nullCount number of nulls
+  * @param avgLenaverage length of column values
+  * @param maxLenmax length of column values
+  * @param max   max value of column values
+  * @param min   min value of column values
+  */
+case class ColumnStats(
+ndv: Long,
+nullCount: Long,
+avgLen: Long,
+maxLen: Long,
+max: Option[Any],
+min: Option[Any]) {
--- End diff --

Yes, I think you are right @beyond1920 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3302: [FLINK-5710] Add ProcTime() function to indicate StreamSQ...

2017-02-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3302
  
Thanks for reopening the PR! 
I made a few comments on #3271 before and after it was closed that should 
be addressed.

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2509: [FLINK-4280][kafka-connector] Explicit start posit...

2017-02-14 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2509#discussion_r100988931
  
--- Diff: 
flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 ---
@@ -45,10 +45,7 @@
 
 import java.io.File;
 import java.net.BindException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.*;
--- End diff --

No, we are currently not checking the tests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5316) Make the GenericWriteAheadSink backwards compatible.

2017-02-14 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-5316:
-

Assignee: (was: Kostas Kloudas)

> Make the GenericWriteAheadSink backwards compatible.
> 
>
> Key: FLINK-5316
> URL: https://issues.apache.org/jira/browse/FLINK-5316
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.2.0
>Reporter: Kostas Kloudas
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5566) Introduce structure to hold table and column level statistics

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865409#comment-15865409
 ] 

ASF GitHub Bot commented on FLINK-5566:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3196
  
Thanks for the update @beyond1920!
PR is good to merge.


> Introduce structure to hold table and column level statistics
> -
>
> Key: FLINK-5566
> URL: https://issues.apache.org/jira/browse/FLINK-5566
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: zhangjing
>
> We define two structure mode to hold statistics
> 1. TableStats: contain stats for table level, now only one element: rowCount
> 2. ColumnStats: contain stats of column level. 
> for numeric column type: including ndv, nullCount, max, min, histogram
> for string type: including ndv, nullCount, avgLen,maxLen
> for boolean:including ndv, nullCount, trueCount, falseCount
> for date/time/timestamp:  including ndv, nullCount, max, min, histogram 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100990608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
 ---
@@ -33,10 +34,18 @@ object EventTimeExtractor extends 
SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
 SqlMonotonicity.INCREASING
 }
 
-case class RowTime() extends LeafExpression {
+case class TimeIndicator() extends LeafExpression {
--- End diff --

Should be `RowTime` (`ProcTime` will be added later).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865499#comment-15865499
 ] 

ASF GitHub Bot commented on FLINK-5624:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100995825
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
 ---
@@ -159,9 +152,25 @@ class LogicalWindowAggregateRule
 }
 false
   }
+
+  private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, 
groupExprIdx: Int) = {
+val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+val newProjectExpr = mutable.ArrayBuffer[RexNode]()
+newProjectExpr.appendAll(project.getChildExps)
+val rexBuilder = agg.getCluster.getRexBuilder
+newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral(
+  DataStreamAggregateRule.TIMESTAMP_ZERO, 3)
+val newProject = project.copy(project.getTraitSet, project.getInput,
+  newProjectExpr, project.getRowType)
+
+agg.copy(agg.getTraitSet, 
List(newProject)).asInstanceOf[LogicalAggregate]
--- End diff --

Can we create here a `LogicalAggregate` with adapted `groupSet`? 
I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is 
not a very nice solution. It would be better if we could keep the existing code 
as it is without introducing workarounds for the SQL case.


> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5624) Support tumbling window on streaming tables in the SQL API

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865498#comment-15865498
 ] 

ASF GitHub Bot commented on FLINK-5624:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100990608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
 ---
@@ -33,10 +34,18 @@ object EventTimeExtractor extends 
SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
 SqlMonotonicity.INCREASING
 }
 
-case class RowTime() extends LeafExpression {
+case class TimeIndicator() extends LeafExpression {
--- End diff --

Should be `RowTime` (`ProcTime` will be added later).


> Support tumbling window on streaming tables in the SQL API
> --
>
> Key: FLINK-5624
> URL: https://issues.apache.org/jira/browse/FLINK-5624
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>
> This is a follow up of FLINK-4691.
> FLINK-4691 adds supports for group-windows for streaming tables. This jira 
> proposes to expose the functionality in the SQL layer via the {{GROUP BY}} 
> clauses, as described in 
> http://calcite.apache.org/docs/stream.html#tumbling-windows.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100991050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -225,12 +231,29 @@ abstract class StreamTableEnvironment(
 // decorrelate
 val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
+val prePlanner = createHepPlanner
--- End diff --

I'll merge PR #3101 later today which adds a normalization phase before 
optimization by adding a HepPlanner.
Could you integrate you changes with #3101?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100995825
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
 ---
@@ -159,9 +152,25 @@ class LogicalWindowAggregateRule
 }
 false
   }
+
+  private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, 
groupExprIdx: Int) = {
+val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+val newProjectExpr = mutable.ArrayBuffer[RexNode]()
+newProjectExpr.appendAll(project.getChildExps)
+val rexBuilder = agg.getCluster.getRexBuilder
+newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral(
+  DataStreamAggregateRule.TIMESTAMP_ZERO, 3)
+val newProject = project.copy(project.getTraitSet, project.getInput,
+  newProjectExpr, project.getRowType)
+
+agg.copy(agg.getTraitSet, 
List(newProject)).asInstanceOf[LogicalAggregate]
--- End diff --

Can we create here a `LogicalAggregate` with adapted `groupSet`? 
I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is 
not a very nice solution. It would be better if we could keep the existing code 
as it is without introducing workarounds for the SQL case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #3291: [FLINK-5762] Protect initializeState() and open() by the ...

2017-02-14 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3291
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5762) Protect initializeState() and open() by the same lock.

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865560#comment-15865560
 ] 

ASF GitHub Bot commented on FLINK-5762:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/3291
  
Merging this...


> Protect initializeState() and open() by the same lock.
> --
>
> Key: FLINK-5762
> URL: https://issues.apache.org/jira/browse/FLINK-5762
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> Currently the initializeState() of all operators in a task is called without 
> the checkpoint lock, and before the open(). This may lead to problematic 
> situations as the following:
> In the case that we retrieve timers from a checkpoint, e.g. WindowOperator 
> and (future) CEP, if we re-register them in the initializeState(), then if 
> they fire before the open() of the downstream operators is called, we will 
> have a task failure, as the downstream channels are not open.
> To avoid this, we can put the initializeState() in the same lock as the 
> open(), and the two operations will happen while being protected by the same 
> lock, which also keeps timers from firing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5723) Use "Used" instead of "Initial" to make taskmanager tag more readable

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865571#comment-15865571
 ] 

ASF GitHub Bot commented on FLINK-5723:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3274
  
+1 from my side.


> Use "Used" instead of "Initial" to make taskmanager tag more readable
> -
>
> Key: FLINK-5723
> URL: https://issues.apache.org/jira/browse/FLINK-5723
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Tao Wang
>Priority: Trivial
>
> Now in JobManager web fronted, the used memory of task managers is presented 
> as "Initial" in table header, which actually means "memory used", from codes.
> I'd like change it to be more readable, even it is trivial one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] flink issue #3274: [FLINK-5723][UI]Use Used instead of Initial to make taskm...

2017-02-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/3274
  
+1 from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3298: [FLINK-5672] add special cases for a local setup i...

2017-02-14 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r101008339
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
+
+if [[ ${all_localhost} -eq 1 ]]; then
+# all-local setup
 for slave in ${SLAVES[@]}; do
-ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+"${FLINK_BIN_DIR}"/taskmanager.sh stop
 done
 else
-PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, 
; echo "${SLAVES[*]}") \
-"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
+command -v pdsh >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+for slave in ${SLAVES[@]}; do
+ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l 
\"${FLINK_BIN_DIR}/taskmanager.sh\" stop &"
+done
+else
+PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w 
$(IFS=, ; echo "${SLAVES[*]}") \
+"nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" stop"
--- End diff --

you're right but it's probably more readable to keep the commands in there 
instead of sharing them, also it has been that way before ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5672) Job fails with java.lang.IllegalArgumentException: port out of range:-1

2017-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15865581#comment-15865581
 ] 

ASF GitHub Bot commented on FLINK-5672:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/3298#discussion_r101008195
  
--- Diff: flink-dist/src/main/flink-bin/bin/stop-cluster.sh ---
@@ -25,14 +25,30 @@ bin=`cd "$bin"; pwd`
 # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) 
when available
 readSlaves
 
-command -v pdsh >/dev/null 2>&1
-if [[ $? -ne 0 ]]; then
+# all-local setup?
+all_localhost=1
+for slave in ${SLAVES[@]}; do
+if [[ "$slave" != "localhost" ]]; then
+all_localhost=0
+break
+fi
+done
--- End diff --

makes sense - also since there we already go through the slaves once so we 
can do this check on the fly


> Job fails with java.lang.IllegalArgumentException: port out of range:-1
> ---
>
> Key: FLINK-5672
> URL: https://issues.apache.org/jira/browse/FLINK-5672
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Reporter: Timo Walther
>
> I started the JobManager with {{start-local.sh}} and started another 
> TaskManager with {{taskmanager.sh start}}. My job is a Table API job with a 
> {{orderBy}} (range partitioning with parallelism 2).
> The job fails with the following exception:
> {code}
> java.lang.IllegalArgumentException: port out of range:-1
>   at java.net.InetSocketAddress.checkPort(InetSocketAddress.java:143)
>   at java.net.InetSocketAddress.(InetSocketAddress.java:188)
>   at 
> org.apache.flink.runtime.io.network.ConnectionID.(ConnectionID.java:47)
>   at 
> org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor.fromEdges(InputChannelDeploymentDescriptor.java:124)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.createDeploymentDescriptor(ExecutionVertex.java:627)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.deployToSlot(Execution.java:358)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:284)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$1.apply(Execution.java:279)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:259)
>   at akka.dispatch.OnComplete.internal(Future.scala:248)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutor.execute(Executors.java:56)
>   at 
> scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:122)
>   at 
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>   at 
> scala.concurrent.impl.Promise$KeptPromise.onComplete(Promise.scala:333)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handleAsync(FlinkFuture.java:256)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture.handle(FlinkFuture.java:270)
>   at 
> org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:279)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:479)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:525)
>   at 
> org.apache.flink.runtime.executiongraph.Execution$5.call(Execution.java:521)
>   at akka.dispatch.Futures$$anonfun$future$1.apply(Future.scala:95)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


  1   2   3   >