[jira] [Updated] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-10-25 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7928:

Labels: flip-6  (was: )

> Extend the filed in ResourceProfile for precisely calculating the resource of 
> a task manager
> 
>
> Key: FLINK-7928
> URL: https://issues.apache.org/jira/browse/FLINK-7928
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager, ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> ResourceProfile records all the resource requirements for a slot。It is 
> generated by JobMaster and then passed to ResourceManager with the slot 
> request. 
> A task in the slot needs three parts of resource: 
> 1. The resource for the operators, this is specified by the ResourceSpec user 
> defined 
> 2. The resource for the operators to communicating with their upstreams. For 
> example, the resource for buffer pools and so on.
> 3. The resource for the operators to communicating with their downstreams. 
> Same as above.
> So ResourceProfile should contain three parts of resource, the first part 
> from ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7928) Extend the filed in ResourceProfile for precisely calculating the resource of a task manager

2017-10-25 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7928:
---

 Summary: Extend the filed in ResourceProfile for precisely 
calculating the resource of a task manager
 Key: FLINK-7928
 URL: https://issues.apache.org/jira/browse/FLINK-7928
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, ResourceManager
Reporter: shuai.xu
Assignee: shuai.xu


ResourceProfile records all the resource requirements for a slot。It is 
generated by JobMaster and then passed to ResourceManager with the slot 
request. 
A task in the slot needs three parts of resource: 
1. The resource for the operators, this is specified by the ResourceSpec user 
defined 
2. The resource for the operators to communicating with their upstreams. For 
example, the resource for buffer pools and so on.
3. The resource for the operators to communicating with their downstreams. Same 
as above.
So ResourceProfile should contain three parts of resource, the first part from 
ResouceSpec, and the other two part be generated by Job Master.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7884) Port CheckpointConfigHandler to new REST endpoint

2017-10-25 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7884.
-
Resolution: Fixed

> Port CheckpointConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7884
> URL: https://issues.apache.org/jira/browse/FLINK-7884
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointConfigHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7893) Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint

2017-10-25 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7893.
-
Resolution: Fixed

> Port CheckpointStatsDetailsSubtasksHandler to new REST endpoint
> ---
>
> Key: FLINK-7893
> URL: https://issues.apache.org/jira/browse/FLINK-7893
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsDetailsSubtasksHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7885) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-25 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7885.
-
Resolution: Fixed

> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7885
> URL: https://issues.apache.org/jira/browse/FLINK-7885
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsDetailsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7892) Port CheckpointStatsHandler to new REST endpoint

2017-10-25 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7892.
-
Resolution: Fixed

> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7892
> URL: https://issues.apache.org/jira/browse/FLINK-7892
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Port *CheckpointStatsHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7900) Add a Rich KeySelector

2017-10-25 Thread Hai Zhou UTC+8 (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou UTC+8 closed FLINK-7900.
-
Resolution: Won't Fix

> Add a Rich KeySelector
> --
>
> Key: FLINK-7900
> URL: https://issues.apache.org/jira/browse/FLINK-7900
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Critical
>
> Currently, we just have a `KeySelector` Function, maybe we should add a 
> `RichKeySelector` RichFunction, for the user to read some configuration 
> information to build the keySelector they need.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219878#comment-16219878
 ] 

ASF GitHub Bot commented on FLINK-7666:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4900
  
I have one quick question, but maybe I miss something big here: wouldn't it 
be possible to simply run `quiesceAndAwaitPending()` before `Operator::close()` 
and avoid the locking/flag checking and not run into exceptions? If we ensure 
that the task is no longer in running state, the "forgotten" timers can no 
longer reflect in any checkpoint/savepoint and I think there is no guarantee of 
a consistent output after canceling a job anyways?


> ContinuousFileReaderOperator swallows chained watermarks
> 
>
> Key: FLINK-7666
> URL: https://issues.apache.org/jira/browse/FLINK-7666
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
>Reporter: Ufuk Celebi
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> I use event time and read from a (finite) file. I assign watermarks right 
> after the {{ContinuousFileReaderOperator}} with parallelism 1.
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}
> The watermarks I assign never progress through the pipeline.
> I can work around this by inserting a {{shuffle()}} after the file reader or 
> starting a new chain at the assigner:
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .shuffle() 
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

2017-10-25 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4900
  
I have one quick question, but maybe I miss something big here: wouldn't it 
be possible to simply run `quiesceAndAwaitPending()` before `Operator::close()` 
and avoid the locking/flag checking and not run into exceptions? If we ensure 
that the task is no longer in running state, the "forgotten" timers can no 
longer reflect in any checkpoint/savepoint and I think there is no guarantee of 
a consistent output after canceling a job anyways?


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219854#comment-16219854
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147031787
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
@@ -243,11 +245,19 @@ else if (directory.exists()) {
 * @throws IOException if the delete operation fails
 */
public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
-   FileStatus[] fileStatuses = null;
+   final FileStatus[] fileStatuses;
 
try {
fileStatuses = fileSystem.listStatus(path);
-   } catch (Exception ignored) {}
+   }
+   catch (FileNotFoundException e) {
+   // path already deleted
+   return true;
+   }
+   catch (Exception e) {
+   // could not access directory, cannot delete
--- End diff --

should we log a warning here?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r147031787
  
--- Diff: flink-core/src/main/java/org/apache/flink/util/FileUtils.java ---
@@ -243,11 +245,19 @@ else if (directory.exists()) {
 * @throws IOException if the delete operation fails
 */
public static boolean deletePathIfEmpty(FileSystem fileSystem, Path 
path) throws IOException {
-   FileStatus[] fileStatuses = null;
+   final FileStatus[] fileStatuses;
 
try {
fileStatuses = fileSystem.listStatus(path);
-   } catch (Exception ignored) {}
+   }
+   catch (FileNotFoundException e) {
+   // path already deleted
+   return true;
+   }
+   catch (Exception e) {
+   // could not access directory, cannot delete
--- End diff --

should we log a warning here?


---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2017-10-25 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219771#comment-16219771
 ] 

Shuyi Chen commented on FLINK-7923:
---

We need to fix Calcite first probably in release 1.15, and upgrade the Calcite 
dependency in Flink, then fix in Flink.

> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146993120
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 ---
@@ -48,75 +53,249 @@
 
private final ClassLoader cl = getClass().getClassLoader();
 
-   private final String backendKey = CoreOptions.STATE_BACKEND.key();
+   private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
 
// 

+   //  defaults
+   // 

 
@Test
public void testNoStateBackendDefined() throws Exception {
-   assertNull(AbstractStateBackend.loadStateBackendFromConfig(new 
Configuration(), cl, null));
+   assertNull(StateBackendLoader.loadStateBackendFromConfig(new 
Configuration(), cl, null));
}
 
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend =
+   
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), 
cl, null);
 
assertTrue(backend instanceof MemoryStateBackend);
}
 
@Test
-   public void testLoadMemoryStateBackend() throws Exception {
-   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
-   // to guard against config-breaking changes of the name 
+   public void testApplicationDefinedHasPrecedence() throws Exception {
+   final StateBackend appBackend = 
Mockito.mock(StateBackend.class);
+
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager");
 
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, 
null);
+   assertEquals(appBackend, backend);
+   }
 
-   assertTrue(backend instanceof MemoryStateBackend);
+   // 

+   //  Memory State Backend
+   // 

+
+   /**
+* Validates loading a memory state backend from the cluster 
configuration.
+*/
+   @Test
+   public void testLoadMemoryStateBackendNoParameters() throws Exception {
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+
+   final Configuration config2 = new Configuration();
+   config2.setString(backendKey, 
MemoryStateBackendFactory.class.getName());
+
+   StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+   StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+   assertTrue(backend1 instanceof MemoryStateBackend);
+   assertTrue(backend2 instanceof MemoryStateBackend);
+   }
+
+   /**
+* Validates loading a memory state backend with additional parameters 
from the cluster configuration.
+*/
+   @Test
+   public void testLoadMemoryStateWithParameters() throws Exception {
+   final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final Path expectedCheckpointPath = new Path(checkpointDir);
+   final Path expectedSavepointPath = new Path(savepointDir);
+
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+   config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir);
+   config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, 
savepointDir);
+
+   final Configuration config2 = new Configuration();
+   config2.setString(backendKey, 

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219561#comment-16219561
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146988381
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ *An interface for state backends that pick up additional parameters from 
a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+   /**
+* Creates a variant of the state backend that applies additional 
configuration parameters.
+*
+* Settings that were directly done on the original state backend 
object in the application
+* program typically have precedence over setting picked up from the 
configuration.
+*
+* If no configuration is applied, or if the method directly applies 
configuration values to
+* the (mutable) state backend object, this method may return the 
original state backend object.
+* Otherwise it typically returns a modified copy.
+*
+* @param config The configuration to pick the values from. 
+* @return A copy of th
--- End diff --

Sentence is trailing off.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219564#comment-16219564
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985691
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" 
is confusing every user. We should only have "HeapStateBackend" and 
"RocksDBStateBackend", which both checkpoint to a DFS. (And the current 
"MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But 
I'm afraid it's too late for that.  


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146990927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
--- End diff --

Nit: is this true for `MemoryStateBackend`? At the very least it's weird. 
😅 


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219562#comment-16219562
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987131
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
 
// The default directory for externalized checkpoints
-   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
 
-   // load the state backend for checkpoint metadata.
-   // if specified in the application, use from there, 
otherwise load from configuration
-   final StateBackend metadataBackend;
+   // load the state backend from the application settings
+   final StateBackend applicationConfiguredBackend;
+   final SerializedValue 
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
 
-   final SerializedValue 
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
-   if (applicationConfiguredBackend != null) {
+   if (serializedAppConfigured == null) {
+   applicationConfiguredBackend = null;
+   }
+   else {
try {
-   metadataBackend = 
applicationConfiguredBackend.deserializeValue(classLoader);
+   applicationConfiguredBackend = 
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException 
e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend.", e);
+   throw new JobExecutionException(jobId, 
+   "Could not deserialize 
application-defined state backend.", e);
}
+   }
 
-   log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
-   metadataBackend);
-   } else {
-   try {
-   metadataBackend = AbstractStateBackend
-   
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-   } catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
-   }
+   final StateBackend rootBackend;
+   try {
+   rootBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --

nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] 
State backends define checkpoint and savepoint directories, improved 
configuration", so technically the commit that adds this code is broken.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146991686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -18,54 +18,92 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence 
the backend's name).
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * State Size Considerations
+ *
+ * Working state is kept on the TaskManager heap. If a TaskManager 
executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that 
TaskManager's memory.
+ *
+ * This state backend stores small state chunks directly with the 
metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When 
increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of 
all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased 
significantly.
+ *
+ * Persistence Guarantees
  *
- * The state backend has one core directory into which it puts all 
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
- * files for each state, for example:
+ * Checkpoints from this state backend are as persistent and available as 
filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state 
backend supports
+ * highly available setups. The backend additionally supports savepoints 
and externalized checkpoints.
  *
- * {@code 
hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements 
ConfigurableStateBackend {
--- End diff --

Why isn't `AbstractFileStateBackend` configurable?


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146992430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 ---
@@ -18,45 +18,41 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import java.io.IOException;
+import java.net.URI;
 
 /**
- * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
  */
+@PublicEvolving
 public class FsStateBackendFactory implements 
StateBackendFactory {
-   
-   /** The key under which the config stores the directory where 
checkpoints should be stored */
-   public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-
-   /** The key under which the config stores the threshold for state to be 
store in memory,
-* rather than in files */
-   public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
-
 
@Override
public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
--- End diff --

Wouldn't it make sense to do all of this in `configure()`? The factories 
are essentially useless now and would only instantiate the backend and then 
call `configure()` with the config.


---


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219568#comment-16219568
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146992430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackendFactory.java
 ---
@@ -18,45 +18,41 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateBackendFactory;
 
-import java.io.IOException;
+import java.net.URI;
 
 /**
- * A factory that creates an {@link 
org.apache.flink.runtime.state.filesystem.FsStateBackend}
- * from a configuration.
+ * A factory that creates an {@link FsStateBackend} from a configuration.
  */
+@PublicEvolving
 public class FsStateBackendFactory implements 
StateBackendFactory {
-   
-   /** The key under which the config stores the directory where 
checkpoints should be stored */
-   public static final String CHECKPOINT_DIRECTORY_URI_CONF_KEY = 
"state.backend.fs.checkpointdir";
-
-   /** The key under which the config stores the threshold for state to be 
store in memory,
-* rather than in files */
-   public static final String MEMORY_THRESHOLD_CONF_KEY = 
"state.backend.fs.memory-threshold";
-
 
@Override
public FsStateBackend createFromConfig(Configuration config) throws 
IllegalConfigurationException {
--- End diff --

Wouldn't it make sense to do all of this in `configure()`? The factories 
are essentially useless now and would only instantiate the backend and then 
call `configure()` with the config.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219563#comment-16219563
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146984727
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
--- End diff --

nit: is this true for `MemoryStateBackend`? Same for the options below.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219565#comment-16219565
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146991686
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -18,54 +18,92 @@
 
 package org.apache.flink.runtime.state.filesystem;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The file state backend is a state backend that stores the state of 
streaming jobs in a file system.
+ * This state backend holds the working state in the memory (JVM heap) of 
the TaskManagers.
+ * The state backend checkpoints state as files to a file system (hence 
the backend's name).
+ *
+ * Each checkpoint individually will store all its files in a 
subdirectory that includes the
+ * checkpoint number, such as {@code 
hdfs://namenode:port/flink-checkpoints/chk-17/}.
+ *
+ * State Size Considerations
+ *
+ * Working state is kept on the TaskManager heap. If a TaskManager 
executes multiple
+ * tasks concurrently (if the TaskManager has multiple slots, or if 
slot-sharing is used)
+ * then the aggregate state of all tasks needs to fit into that 
TaskManager's memory.
+ *
+ * This state backend stores small state chunks directly with the 
metadata, to avoid creating
+ * many small files. The threshold for that is configurable. When 
increasing this threshold, the
+ * size of the checkpoint metadata increases. The checkpoint metadata of 
all retained completed
+ * checkpoints needs to fit into the JobManager's heap memory. This is 
typically not a problem,
+ * unless the threshold {@link #getMinFileSizeThreshold()} is increased 
significantly.
+ *
+ * Persistence Guarantees
  *
- * The state backend has one core directory into which it puts all 
checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint 
gets a directory, with
- * files for each state, for example:
+ * Checkpoints from this state backend are as persistent and available as 
filesystem that is written to.
+ * If the file system is a persistent distributed file system, this state 
backend supports
+ * highly available setups. The backend additionally supports savepoints 
and externalized checkpoints.
  *
- * {@code 
hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8
 }
+ * Configuration
+ *
+ * As for all state backends, this backend can either be configured within 
the application (by creating
+ * the backend with the respective constructor parameters and setting it 
on the execution environment)
+ * or by specifying it in the Flink configuration.
+ *
+ * If the state backend was specified in the application, it may pick 
up additional configuration
+ * parameters from the Flink configuration. For example, if the backend if 
configured in the application
+ * without a default savepoint directory, it will pick up a default 
savepoint directory specified in the
+ * Flink configuration of the running job/cluster. That behavior is 
implemented via the
+ * {@link #configure(Configuration)} method.
  */
-public class FsStateBackend extends AbstractStateBackend {
+@PublicEvolving
+public class FsStateBackend extends AbstractFileStateBackend implements 

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219560#comment-16219560
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Mentioning it because we have no "heap" backend, technically.


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219566#comment-16219566
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146990927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackend.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * A base class for all state backends that store their metadata (and 
data) in files.
+ * Examples that inherit from this are the {@link FsStateBackend}, the
+ * {@link org.apache.flink.runtime.state.memory.MemoryStateBackend 
MemoryStateBackend}, or the
--- End diff --

Nit: is this true for `MemoryStateBackend`? At the very least it's weird.  


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219567#comment-16219567
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146993120
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendLoadingTest.java
 ---
@@ -48,75 +53,249 @@
 
private final ClassLoader cl = getClass().getClassLoader();
 
-   private final String backendKey = CoreOptions.STATE_BACKEND.key();
+   private final String backendKey = 
CheckpointingOptions.STATE_BACKEND.key();
 
// 

+   //  defaults
+   // 

 
@Test
public void testNoStateBackendDefined() throws Exception {
-   assertNull(AbstractStateBackend.loadStateBackendFromConfig(new 
Configuration(), cl, null));
+   assertNull(StateBackendLoader.loadStateBackendFromConfig(new 
Configuration(), cl, null));
}
 
@Test
public void testInstantiateMemoryBackendByDefault() throws Exception {
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend =
+   
StateBackendLoader.fromApplicationOrConfigOrDefault(null, new Configuration(), 
cl, null);
 
assertTrue(backend instanceof MemoryStateBackend);
}
 
@Test
-   public void testLoadMemoryStateBackend() throws Exception {
-   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
-   // to guard against config-breaking changes of the name 
+   public void testApplicationDefinedHasPrecedence() throws Exception {
+   final StateBackend appBackend = 
Mockito.mock(StateBackend.class);
+
final Configuration config = new Configuration();
config.setString(backendKey, "jobmanager");
 
-   StateBackend backend = AbstractStateBackend
-   .loadStateBackendFromConfigOrCreateDefault(new 
Configuration(), cl, null);
+   StateBackend backend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(appBackend, config, cl, 
null);
+   assertEquals(appBackend, backend);
+   }
 
-   assertTrue(backend instanceof MemoryStateBackend);
+   // 

+   //  Memory State Backend
+   // 

+
+   /**
+* Validates loading a memory state backend from the cluster 
configuration.
+*/
+   @Test
+   public void testLoadMemoryStateBackendNoParameters() throws Exception {
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+
+   final Configuration config2 = new Configuration();
+   config2.setString(backendKey, 
MemoryStateBackendFactory.class.getName());
+
+   StateBackend backend1 = 
StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
+   StateBackend backend2 = 
StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);
+
+   assertTrue(backend1 instanceof MemoryStateBackend);
+   assertTrue(backend2 instanceof MemoryStateBackend);
+   }
+
+   /**
+* Validates loading a memory state backend with additional parameters 
from the cluster configuration.
+*/
+   @Test
+   public void testLoadMemoryStateWithParameters() throws Exception {
+   final String checkpointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final String savepointDir = new 
Path(tmp.newFolder().toURI()).toString();
+   final Path expectedCheckpointPath = new Path(checkpointDir);
+   final Path expectedSavepointPath = new Path(savepointDir);
+
+   // we configure with the explicit string (rather than 
AbstractStateBackend#X_STATE_BACKEND_NAME)
+   // to guard against config-breaking changes of the name
+
+   final Configuration config1 = new Configuration();
+   config1.setString(backendKey, "jobmanager");
+   

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219559#comment-16219559
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -116,6 +117,10 @@
 * accessing this don't block the job manager actor and run 
asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
 
+   /** The root checkpoint state backend, which is responsible for 
initializing the
+* checkpoint, storing the metadata, and cleaning up the checkpoint */
+   private final StateBackend checkpointStateBackend;
--- End diff --

nit: Are there other backends than "checkpoint" backend?


> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985691
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Off-topic: The fact that we have "FsStateBackend" and "MemoryStateBackend" 
is confusing every user. We should only have "HeapStateBackend" and 
"RocksDBStateBackend", which both checkpoint to a DFS. (And the current 
"MemoryStateBackend" behaviour could be a switch on "HeapStateBackend"). But 
I'm afraid it's too late for that. 😩 


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987131
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 ---
@@ -229,29 +229,31 @@ public static ExecutionGraph buildGraph(
metrics);
 
// The default directory for externalized checkpoints
-   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CoreOptions.CHECKPOINTS_DIRECTORY);
+   String externalizedCheckpointsDir = 
jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
 
-   // load the state backend for checkpoint metadata.
-   // if specified in the application, use from there, 
otherwise load from configuration
-   final StateBackend metadataBackend;
+   // load the state backend from the application settings
+   final StateBackend applicationConfiguredBackend;
+   final SerializedValue 
serializedAppConfigured = snapshotSettings.getDefaultStateBackend();
 
-   final SerializedValue 
applicationConfiguredBackend = snapshotSettings.getDefaultStateBackend();
-   if (applicationConfiguredBackend != null) {
+   if (serializedAppConfigured == null) {
+   applicationConfiguredBackend = null;
+   }
+   else {
try {
-   metadataBackend = 
applicationConfiguredBackend.deserializeValue(classLoader);
+   applicationConfiguredBackend = 
serializedAppConfigured.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException 
e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend.", e);
+   throw new JobExecutionException(jobId, 
+   "Could not deserialize 
application-defined state backend.", e);
}
+   }
 
-   log.info("Using application-defined state 
backend for checkpoint/savepoint metadata: {}.",
-   metadataBackend);
-   } else {
-   try {
-   metadataBackend = AbstractStateBackend
-   
.loadStateBackendFromConfigOrCreateDefault(jobManagerConfig, classLoader, log);
-   } catch (IllegalConfigurationException | 
IOException | DynamicCodeLoadingException e) {
-   throw new JobExecutionException(jobId, 
"Could not instantiate configured state backend", e);
-   }
+   final StateBackend rootBackend;
+   try {
+   rootBackend = 
StateBackendLoader.fromApplicationOrConfigOrDefault(
--- End diff --

nit: This is only defined in the later commit "[FLINK-5823] [checkpoints] 
State backends define checkpoint and savepoint directories, improved 
configuration", so technically the commit that adds this code is broken.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146984727
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
--- End diff --

nit: is this true for `MemoryStateBackend`? Same for the options below.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146987476
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -116,6 +117,10 @@
 * accessing this don't block the job manager actor and run 
asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
 
+   /** The root checkpoint state backend, which is responsible for 
initializing the
+* checkpoint, storing the metadata, and cleaning up the checkpoint */
+   private final StateBackend checkpointStateBackend;
--- End diff --

nit: Are there other backends than "checkpoint" backend?


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146988381
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ConfigurableStateBackend.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+/**
+ *An interface for state backends that pick up additional parameters from 
a configuration.
+ */
+public interface ConfigurableStateBackend {
+
+   /**
+* Creates a variant of the state backend that applies additional 
configuration parameters.
+*
+* Settings that were directly done on the original state backend 
object in the application
+* program typically have precedence over setting picked up from the 
configuration.
+*
+* If no configuration is applied, or if the method directly applies 
configuration values to
+* the (mutable) state backend object, this method may return the 
original state backend object.
+* Otherwise it typically returns a modified copy.
+*
+* @param config The configuration to pick the values from. 
+* @return A copy of th
--- End diff --

Sentence is trailing off.


---


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4907#discussion_r146985767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration;
+
+/**
+ * A collection of all configuration options that relate to checkpoints
+ * and savepoints.
+ */
+public class CheckpointingOptions {
+
+   // 

+   //  general checkpoint and state backend options
+   // 

+
+   public static final ConfigOption STATE_BACKEND = ConfigOptions
+   .key("state.backend")
+   .noDefaultValue();
+
+   /** The maximum number of completed checkpoint instances to retain.*/
+   public static final ConfigOption MAX_RETAINED_CHECKPOINTS = 
ConfigOptions
+   .key("state.checkpoints.num-retained")
+   .defaultValue(1);
+
+   // 

+   //  Options specific to the file-system-based state backends
+   // 

+
+   /** The default directory for savepoints. Used by the state backends 
that write
+* savepoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption SAVEPOINT_DIRECTORY = 
ConfigOptions
+   .key("state.savepoints.dir")
+   .noDefaultValue()
+   .withDeprecatedKeys("savepoints.state.backend.fs.dir");
+
+   /** The default directory used for checkpoints. Used by the state 
backends that write
+* checkpoints to file systems (MemoryStateBackend, FsStateBackend, 
RocksDBStateBackend). */
+   public static final ConfigOption CHECKPOINTS_DIRECTORY = 
ConfigOptions
+   .key("state.checkpoints.dir")
+   .noDefaultValue();
+
+   /** Option whether the heap-based key/value data structures should use 
an asynchronous
+* snapshot method. Used by MemoryStateBackend and FsStateBackend. */
+   public static final ConfigOption HEAP_KV_ASYNC_SNAPSHOTS = 
ConfigOptions
+   .key("state.backend.heap.async")
--- End diff --

Mentioning it because we have no "heap" backend, technically.


---


[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r146986377
  
--- Diff: flink-runtime/pom.xml ---
@@ -427,17 +427,42 @@ under the License.
shade


+   
+   
true
--- End diff --

Why exactly do we need this? AFAIK you can shade transitive dependencies 
without promoting them.


---


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219519#comment-16219519
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r146986377
  
--- Diff: flink-runtime/pom.xml ---
@@ -427,17 +427,42 @@ under the License.
shade


+   
+   
true
--- End diff --

Why exactly do we need this? AFAIK you can shade transitive dependencies 
without promoting them.


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7840) Shade Akka's Netty Dependency

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219511#comment-16219511
 ] 

ASF GitHub Bot commented on FLINK-7840:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r146985560
  
--- Diff: flink-test-utils-parent/flink-test-utils/pom.xml ---
@@ -117,6 +124,41 @@ under the License.
true
true

+
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
io.netty:netty
+   
+   
+   
+   
+   
org.jboss.netty
+   
org.apache.flink.shaded.testutils.org.jboss.netty
--- End diff --

Could you explain more why we need to shade this here? are we hiding some 
transitive netty dependency?


> Shade Akka's Netty Dependency
> -
>
> Key: FLINK-7840
> URL: https://issues.apache.org/jira/browse/FLINK-7840
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> In order to avoid clashes between different Netty versions we should shade 
> Akka's Netty away.
> These dependency version clashed manifest themselves in very subtle ways, 
> like occasional deadlocks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4827: [FLINK-7840] [build] Shade netty in akka

2017-10-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4827#discussion_r146985560
  
--- Diff: flink-test-utils-parent/flink-test-utils/pom.xml ---
@@ -117,6 +124,41 @@ under the License.
true
true

+
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
io.netty:netty
+   
+   
+   
+   
+   
org.jboss.netty
+   
org.apache.flink.shaded.testutils.org.jboss.netty
--- End diff --

Could you explain more why we need to shade this here? are we hiding some 
transitive netty dependency?


---


[jira] [Updated] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release

2017-10-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7877:
--
Labels: build  (was: )

> Fix compilation against the Hadoop 3 beta1 release
> --
>
> Key: FLINK-7877
> URL: https://issues.apache.org/jira/browse/FLINK-7877
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>  Labels: build
>
> When compiling against hadoop 3.0.0-beta1, I got:
> {code}
> [ERROR] 
> /mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16]
>  org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does 
> not override abstract method 
> setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in 
> org.apache.hadoop.yarn.api.records.Container
> {code}
> There may other hadoop API(s) that need adjustment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7927) Different Netty Versions in dependencies of flink-runtime make it impossible to use 3rd party libraries using netty

2017-10-25 Thread Claudius Eisele (JIRA)
Claudius Eisele created FLINK-7927:
--

 Summary: Different Netty Versions in dependencies of flink-runtime 
make it impossible to use 3rd party libraries using netty
 Key: FLINK-7927
 URL: https://issues.apache.org/jira/browse/FLINK-7927
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.2
 Environment: * Windows 10 x64
* Java 1.8
Reporter: Claudius Eisele


I am trying to use Google PubSub (google-cloud-pubsub 0.26.0-beta) in a Flink 
streaming job but I am receiving the following error when executing it so 
unfortunately it's not possible to use PubSub in a Flink Streaming Job:
{code:java}
...
10/25/2017 22:38:02 Source: Custom Source -> Map(1/1) switched to RUNNING
10/25/2017 22:38:03 Source: Custom Source -> Map(1/1) switched to FAILED
java.lang.IllegalStateException: Expected the service InnerService [FAILED] to 
be RUNNING, but the service has FAILED
at 
com.google.common.util.concurrent.AbstractService.checkCurrentState(AbstractService.java:328)
at 
com.google.common.util.concurrent.AbstractService.awaitRunning(AbstractService.java:266)
at 
com.google.api.core.AbstractApiService.awaitRunning(AbstractApiService.java:97)


Caused by: java.lang.IllegalArgumentException: Jetty ALPN/NPN has not been 
properly configured.
at 
io.grpc.netty.GrpcSslContexts.selectApplicationProtocolConfig(GrpcSslContexts.java:159)
at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:136)
at io.grpc.netty.GrpcSslContexts.configure(GrpcSslContexts.java:124)
at io.grpc.netty.GrpcSslContexts.forClient(GrpcSslContexts.java:94)
at 
io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:525)
at 
io.grpc.netty.NettyChannelBuilder$NettyTransportFactory$DefaultNettyTransportCreationParamsFilterFactory.(NettyChannelBuilder.java:518)
at 
io.grpc.netty.NettyChannelBuilder$NettyTransportFactory.(NettyChannelBuilder.java:457)
at 
io.grpc.netty.NettyChannelBuilder.buildTransportFactory(NettyChannelBuilder.java:326)
at 
io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:315)
at 
com.google.api.gax.grpc.InstantiatingChannelProvider.createChannel(InstantiatingChannelProvider.java:131)
at 
com.google.api.gax.grpc.InstantiatingChannelProvider.getChannel(InstantiatingChannelProvider.java:116)
at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:246)
at 
com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:149)
at 
com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:211)
at 
com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:121)
at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:235)
... 7 more
{code}

I reported this problem to the Google Cloud Java Library but the problem seems 
more to be in Flink or its dependencies like akka because there are a lot of 
netty dependencies with different versions in it:
* Apache Zookeeper (flink-runtime dependency) has \--- 
io.netty:netty:3.7.0.Final -> 3.8.0.Final
* Flakka (flink-runtime dependency) has io.netty:netty:3.8.0.Final
* Flink-Runtime has io.netty:netty-all:4.0.27.Final

In my case, Google Cloud PubSub has io.grpc:grpc-netty:1.6.1

Additional information on the issue in combination with Google Cloud PubSub can 
be found here:
https://github.com/GoogleCloudPlatform/google-cloud-java/issues/2398
https://github.com/grpc/grpc-java/issues/3025



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-6402) Consider removing annotation for REAPER_THREAD_LOCK in SafetyNetCloseableRegistry#doRegister()

2017-10-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved FLINK-6402.
---
Resolution: Later

> Consider removing annotation for REAPER_THREAD_LOCK in 
> SafetyNetCloseableRegistry#doRegister()
> --
>
> Key: FLINK-6402
> URL: https://issues.apache.org/jira/browse/FLINK-6402
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> PhantomDelegatingCloseableRef phantomRef = new 
> PhantomDelegatingCloseableRef(
> wrappingProxyCloseable,
> this,
> REAPER_THREAD.referenceQueue);
> {code}
> Instantiation of REAPER_THREAD can be made visible by ctor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7775) Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs

2017-10-25 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7775:
--
Description: 
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method is not used.
We should remove it.

  was:
{code}
  public int getNumberOfCachedJobs() {
return jobRefCounters.size();
  }
{code}
The method is not used.

We should remove it.


> Remove unreferenced method PermanentBlobCache#getNumberOfCachedJobs
> ---
>
> Key: FLINK-7775
> URL: https://issues.apache.org/jira/browse/FLINK-7775
> Project: Flink
>  Issue Type: Task
>  Components: Local Runtime
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public int getNumberOfCachedJobs() {
> return jobRefCounters.size();
>   }
> {code}
> The method is not used.
> We should remove it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones

2017-10-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7542.

   Resolution: Fixed
Fix Version/s: 1.3.3
   1.4.0

Fixed for 1.3.2 with 0891a6f5accd02557e4f00c3c473db1a9a473585
Fixed for 1.4.0 with babee277204b6b1edcfe9c7c76348254019b2dd3

> Some tests in AggregateITCase fail for some Time Zones
> --
>
> Key: FLINK-7542
> URL: https://issues.apache.org/jira/browse/FLINK-7542
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Usman Younas
>Assignee: Fabian Hueske
> Fix For: 1.4.0, 1.3.3
>
>
> In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests 
> 1. testTumbleWindowAggregate and 
> 2. testHopWindowAggregate 
> are failing for some time zones.
> Bug can be produced by changing the time zone of machine to
> Time Zone: Central Daylight Time
> Closest City: Houston-United States
> I think, problem is with Timestamp.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6697) Add batch multi-window support

2017-10-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6697.

Resolution: Implemented

Implemented for 1.4.0 with babee277204b6b1edcfe9c7c76348254019b2dd3

> Add batch multi-window support
> --
>
> Key: FLINK-6697
> URL: https://issues.apache.org/jira/browse/FLINK-6697
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0
>
>
> Multiple consecutive windows on batch are not tested yet and I think they are 
> also not supported, because the syntax is not defined for batch yet.
> The following should be supported:
> {code}
> val t = table
> .window(Tumble over 2.millis on 'rowtime as 'w)
> .groupBy('w)
> .select('w.rowtime as 'rowtime, 'int.count as 'int)
> .window(Tumble over 4.millis on 'rowtime as 'w2)
> .groupBy('w2)
> .select('w2.rowtime, 'w2.end, 'int.count)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6584) Support multiple consecutive windows in SQL

2017-10-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6584.

Resolution: Implemented

Implemented for 1.4.0 with 2ad8f7eb5690755e2a9e7e93181cd34e1093f23a

> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6697) Add batch multi-window support

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219429#comment-16219429
 ] 

ASF GitHub Bot commented on FLINK-6697:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4796


> Add batch multi-window support
> --
>
> Key: FLINK-6697
> URL: https://issues.apache.org/jira/browse/FLINK-6697
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.4.0
>
>
> Multiple consecutive windows on batch are not tested yet and I think they are 
> also not supported, because the syntax is not defined for batch yet.
> The following should be supported:
> {code}
> val t = table
> .window(Tumble over 2.millis on 'rowtime as 'w)
> .groupBy('w)
> .select('w.rowtime as 'rowtime, 'int.count as 'int)
> .window(Tumble over 4.millis on 'rowtime as 'w2)
> .groupBy('w2)
> .select('w2.rowtime, 'w2.end, 'int.count)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

2017-10-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4199


---


[GitHub] flink pull request #4796: [FLINK-6697] [table] Add support for window.rowtim...

2017-10-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4796


---


[jira] [Commented] (FLINK-6584) Support multiple consecutive windows in SQL

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219428#comment-16219428
 ] 

ASF GitHub Bot commented on FLINK-6584:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4199


> Support multiple consecutive windows in SQL
> ---
>
> Key: FLINK-6584
> URL: https://issues.apache.org/jira/browse/FLINK-6584
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Right now, the Table API supports multiple consecutive windows as follows:
> {code}
> val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 
> 'bigdec, 'string)
> val t = table
>   .window(Tumble over 2.millis on 'rowtime as 'w)
>   .groupBy('w)
>   .select('w.rowtime as 'rowtime, 'int.count as 'int)
>   .window(Tumble over 4.millis on 'rowtime as 'w2)
>   .groupBy('w2)
>   .select('w2.rowtime, 'w2.end, 'int.count)
> {code}
> Similar behavior should be supported by the SQL API as well. We need to 
> introduce a new auxiliary group function, but this should happen in sync with 
> Apache Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7926) Bug in Hybrid Hash Join: Request to spill a partition with less than two buffers.

2017-10-25 Thread David Dreyfus (JIRA)
David Dreyfus created FLINK-7926:


 Summary: Bug in Hybrid Hash Join: Request to spill a partition 
with less than two buffers.
 Key: FLINK-7926
 URL: https://issues.apache.org/jira/browse/FLINK-7926
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.3.2
 Environment: standalone execution on MacBook Pro
in flink-conf.yaml, taskmanager.numberOfTaskSlots changed from 1 to 3.
taskmanager.heap.mb = 1024
taskmanager.memory.preallocate = false
taskmanager.numberOfTaskSlots = 3
Reporter: David Dreyfus


The following exception is thrown as the number of tasks increases.
{code:java}
10/25/2017 14:26:16 LeftOuterJoin(Join at 
with(JoinOperatorSetsBase.java:232))(1/1) switched to FAILED
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
partition with less than two buffers.
at 
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at 
org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

10/25/2017 14:26:16 Job execution switched to status FAILING.
java.lang.RuntimeException: Bug in Hybrid Hash Join: Request to spill a 
partition with less than two buffers.
at 
org.apache.flink.runtime.operators.hash.HashPartition.spillPartition(HashPartition.java:302)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.spillPartition(MutableHashTable.java:1231)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertBucketEntry(MutableHashTable.java:1053)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.insertIntoTable(MutableHashTable.java:978)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:938)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:631)
at 
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
at 
org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator.callWithNextKey(NonReusingBuildSecondHashJoinIterator.java:114)
at 
org.apache.flink.runtime.operators.AbstractOuterJoinDriver.run(AbstractOuterJoinDriver.java:160)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
{code}

I run with the following command:

{code:java}
flink run -c com.northbay.union.Union3 
./FlinkWordCount/target/FlinkWordCount-1.0-SNAPSHOT.jar --left 
/Users/user/Documents/Flink/Quickstart/Files/manysmall --right 
/Users/user/Documents/Flink/Quickstart/Files/manysmall --output 
/tmp/test6d_nomatch --output2 /tmp/test6d --filecount 50
{code}

The files submitted are all CSV (int, string, short)

This is the code (break out into 3 separate files before using). The idea 
behind this test is to compare (hash-join) pairs of files and combine their 
results.

{code:java}
package com.northbay.hashcount;

public class DeviceRecord1 {
public int device;
public String fingerprint;
public short dma;
public boolean match;
public DeviceRecord1() {

}
public DeviceRecord1(DeviceRecord old) {
this.device = old.device;
this.fingerprint = old.fingerprint;
this.dma = old.dma;
this.match = false;
}
public DeviceRecord1(int device, String fingerprint, short dma) {
this.device = device;
this.fingerprint = fingerprint;
this.dma = dma;
this.match = 

[jira] [Commented] (FLINK-5823) Store Checkpoint Root Metadata in StateBackend (not in HA custom store)

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5823?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219294#comment-16219294
 ] 

ASF GitHub Bot commented on FLINK-5823:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4907

[FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata 
(part 1)

This is an incremental (first part) rebuild of #3522 on the latest master.

For ease of review, broken down into small chunks.

## Part 1: Application-defined State Backends pick up additional values 
from the configuration

We need to keep supporting the scenario of setting a state backends in the 
user program, but configuring parameters like checkpoint directory in the 
cluster config. To support that, state backends may implement an additional 
interface which lets them pick up configuration values from the cluster 
configuration.

This also makes testing the checkpoint / savepoint configuration much 
easier.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink backend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4907


commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe
Author: Stephan Ewen 
Date:   2017-10-24T17:46:07Z

[hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion

commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92
Author: Stephan Ewen 
Date:   2017-10-24T19:20:44Z

[hotfix] [streaming] Move SerializedCheckpointData to proper scope for 
MessageAcknowledgingSourceBase

commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c
Author: Stephan Ewen 
Date:   2017-10-25T09:56:59Z

[hotfix] [runtime] Minor optimization in CheckpointMetrics

commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908
Author: Stephan Ewen 
Date:   2017-10-25T11:30:51Z

[hotfix] [checkpoints] fix warnings and make minor improvements to 
CheckpointCoordinatorTest and SharedStateRegistry

commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490
Author: Stephan Ewen 
Date:   2017-10-25T11:46:34Z

[hotfix] [core] Fix FileUtils.deletePathIfEmpty

commit 15be2602d4d44e29e756cd558130f2303d82fe8b
Author: Stephan Ewen 
Date:   2017-10-25T17:04:58Z

[hotfix] [checkpoints] Remove incorrect 'Serializable' from 
StateBackendFactory

commit 1438df297bd820c07787563ddcfd2fda8773387f
Author: Stephan Ewen 
Date:   2017-10-25T12:16:37Z

[FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options

Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
actually,
the checkpoints may always be incremental and only savepoints have to be 
full
and self contained.

Initially, we planned to add options for multiple checkpoints, like 
checkpoints
that were foreced to be full, and checkpoints that were incremental. That
is not necessary at this point.

commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4
Author: Stephan Ewen 
Date:   2017-10-25T15:30:14Z

[FLINK-7925] [checkpoints] Add CheckpointingOptions

The CheckpointingOptions consolidate all checkpointing and state 
backend-related
settings that were previously split across different classes.

commit c4ce5522d8052ae8af8134bac4ea74bb5a929027
Author: Stephan Ewen 
Date:   2017-10-25T11:23:46Z

[FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator

commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf
Author: Stephan Ewen 
Date:   2017-10-25T15:32:17Z

[hotfix] [core] Fix broken JavaDoc links in ConfigConstants

commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd
Author: Stephan Ewen 
Date:   2017-10-25T17:04:10Z

[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint 
directories, improved configuration




> Store Checkpoint Root Metadata in StateBackend (not in HA custom store)
> ---
>
> Key: FLINK-5823
> URL: https://issues.apache.org/jira/browse/FLINK-5823
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4907: [FLINK-5823] [checkpoints] State Backends also han...

2017-10-25 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4907

[FLINK-5823] [checkpoints] State Backends also handle Checkpoint Metadata 
(part 1)

This is an incremental (first part) rebuild of #3522 on the latest master.

For ease of review, broken down into small chunks.

## Part 1: Application-defined State Backends pick up additional values 
from the configuration

We need to keep supporting the scenario of setting a state backends in the 
user program, but configuring parameters like checkpoint directory in the 
cluster config. To support that, state backends may implement an additional 
interface which lets them pick up configuration values from the cluster 
configuration.

This also makes testing the checkpoint / savepoint configuration much 
easier.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink backend

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4907.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4907


commit 8055cc9d84e206abd00cbc339c6cc390c53f2afe
Author: Stephan Ewen 
Date:   2017-10-24T17:46:07Z

[hotfix] [hdfs] Avoid reparsing URIs in Hadoop File Status conversion

commit 96068b93d4c332a9b8cfba06ecc9d42804a79f92
Author: Stephan Ewen 
Date:   2017-10-24T19:20:44Z

[hotfix] [streaming] Move SerializedCheckpointData to proper scope for 
MessageAcknowledgingSourceBase

commit e098cc6ac8d77a027aefb6c1bcee41f6214ed74c
Author: Stephan Ewen 
Date:   2017-10-25T09:56:59Z

[hotfix] [runtime] Minor optimization in CheckpointMetrics

commit 8a6b78a8dacadb5476c6bf452fbdab352d0ab908
Author: Stephan Ewen 
Date:   2017-10-25T11:30:51Z

[hotfix] [checkpoints] fix warnings and make minor improvements to 
CheckpointCoordinatorTest and SharedStateRegistry

commit aa09d82fd57d2636f54ec2a56e457f19d8ec9490
Author: Stephan Ewen 
Date:   2017-10-25T11:46:34Z

[hotfix] [core] Fix FileUtils.deletePathIfEmpty

commit 15be2602d4d44e29e756cd558130f2303d82fe8b
Author: Stephan Ewen 
Date:   2017-10-25T17:04:58Z

[hotfix] [checkpoints] Remove incorrect 'Serializable' from 
StateBackendFactory

commit 1438df297bd820c07787563ddcfd2fda8773387f
Author: Stephan Ewen 
Date:   2017-10-25T12:16:37Z

[FLINK-7924] [checkpoints] Fix incorrect names of checkpoint options

Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
actually,
the checkpoints may always be incremental and only savepoints have to be 
full
and self contained.

Initially, we planned to add options for multiple checkpoints, like 
checkpoints
that were foreced to be full, and checkpoints that were incremental. That
is not necessary at this point.

commit 6b792bf72fdb0c17625b1fe97c52791d550a74e4
Author: Stephan Ewen 
Date:   2017-10-25T15:30:14Z

[FLINK-7925] [checkpoints] Add CheckpointingOptions

The CheckpointingOptions consolidate all checkpointing and state 
backend-related
settings that were previously split across different classes.

commit c4ce5522d8052ae8af8134bac4ea74bb5a929027
Author: Stephan Ewen 
Date:   2017-10-25T11:23:46Z

[FLINK-5823] [checkpoints] Pass state backend to checkpoint coordinator

commit 337e354bbe198ce6ab68d23cec93bc0c81bbbfaf
Author: Stephan Ewen 
Date:   2017-10-25T15:32:17Z

[hotfix] [core] Fix broken JavaDoc links in ConfigConstants

commit 0522ac34146aecfad2dbd9c806e6f7be182d00fd
Author: Stephan Ewen 
Date:   2017-10-25T17:04:10Z

[FLINK-5823] [checkpoints] State backends define checkpoint and savepoint 
directories, improved configuration




---


[jira] [Created] (FLINK-7924) Fix incorrect names of checkpoint options

2017-10-25 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7924:
---

 Summary: Fix incorrect names of checkpoint options
 Key: FLINK-7924
 URL: https://issues.apache.org/jira/browse/FLINK-7924
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when 
actually,
the checkpoints may always be incremental and only savepoints have to be full
and self contained.

Initially, we planned to add options for multiple checkpoints, like checkpoints
that were foreced to be full, and checkpoints that were incremental. That 
is not necessary at this point.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7925) Add CheckpointingOptions

2017-10-25 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7925:
---

 Summary:  Add CheckpointingOptions
 Key: FLINK-7925
 URL: https://issues.apache.org/jira/browse/FLINK-7925
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


The CheckpointingOptions should consolidate all checkpointing and state 
backend-related
settings that were previously split across different classes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4905: [FLINK-7920] Make MiniClusterConfiguration immutable

2017-10-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4905
  
Looks good, +1 to merge this


---


[jira] [Commented] (FLINK-7920) Make MiniClusterConfiguration immutable

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219268#comment-16219268
 ] 

ASF GitHub Bot commented on FLINK-7920:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4905
  
Looks good, +1 to merge this


> Make MiniClusterConfiguration immutable
> ---
>
> Key: FLINK-7920
> URL: https://issues.apache.org/jira/browse/FLINK-7920
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{MiniClusterConfiguration}} should be made immutable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219239#comment-16219239
 ] 

ASF GitHub Bot commented on FLINK-7666:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4900
  
Perfect! Merging this...


> ContinuousFileReaderOperator swallows chained watermarks
> 
>
> Key: FLINK-7666
> URL: https://issues.apache.org/jira/browse/FLINK-7666
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
>Reporter: Ufuk Celebi
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> I use event time and read from a (finite) file. I assign watermarks right 
> after the {{ContinuousFileReaderOperator}} with parallelism 1.
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}
> The watermarks I assign never progress through the pipeline.
> I can work around this by inserting a {{shuffle()}} after the file reader or 
> starting a new chain at the assigner:
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .shuffle() 
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

2017-10-25 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4900
  
Perfect! Merging this...


---


[jira] [Updated] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7923:
-
Component/s: Table API & SQL

> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2017-10-25 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7923:


 Summary: SQL parser exception when accessing subfields of a 
Composite element in an Object Array type column
 Key: FLINK-7923
 URL: https://issues.apache.org/jira/browse/FLINK-7923
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
Reporter: Rong Rong


Access type such as:
{code:SQL}
SELECT 
  a[1].f0 
FROM 
  MyTable
{code}
will cause problem. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7922:
-
Description: 
FlinkTypeFactory does not override the following function correctly:
{code:java}
def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
  //... 
}
{code}
dealing with SQL such as:
{code:sql}
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
{code}
will trigger runtime exception. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170

  was:
FlinkTypeFactory does not override the following function correctly:
{code:java}
def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
  //... 
}
{code}

to deal with situations like
{code:sql}
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
{code}
will trigger runtime exception. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170


> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:java}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> dealing with SQL such as:
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7922:
-
Description: 
FlinkTypeFactory does not override the following function correctly:
{code:scala}
def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
  //... 
}
{code}

to deal with situations like
{code:sql}
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
{code}
will trigger runtime exception. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170

  was:
FlinkTypeFactory does not override the following function correctly:
`leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` 
to deal with situations like
```
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
```
will trigger runtime exception


> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:scala}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> to deal with situations like
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7922:
-
Description: 
FlinkTypeFactory does not override the following function correctly:
{code:java}
def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
  //... 
}
{code}

to deal with situations like
{code:sql}
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
{code}
will trigger runtime exception. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170

  was:
FlinkTypeFactory does not override the following function correctly:
{code:scala}
def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
  //... 
}
{code}

to deal with situations like
{code:sql}
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
{code}
will trigger runtime exception. 

See following test sample for more details:
https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170


> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:java}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> to deal with situations like
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7922:
-
Affects Version/s: 1.4.0

> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:scala}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> to deal with situations like
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-7922:
-
Component/s: Table API & SQL

> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:scala}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> to deal with situations like
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-7922:


Assignee: Rong Rong

> leastRestrictive in FlinkTypeFactory does not resolve composite type correctly
> --
>
> Key: FLINK-7922
> URL: https://issues.apache.org/jira/browse/FLINK-7922
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> FlinkTypeFactory does not override the following function correctly:
> {code:scala}
> def leastRestrictive(types: util.List[RelDataType]): RelDataType = {
>   //... 
> }
> {code}
> to deal with situations like
> {code:sql}
> CASE 
>   WHEN  THEN 
>  
>   ELSE 
> NULL 
> END
> {code}
> will trigger runtime exception. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/a5f2affc9bbbd50f06200f099c90597e519e9170



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7922) leastRestrictive in FlinkTypeFactory does not resolve composite type correctly

2017-10-25 Thread Rong Rong (JIRA)
Rong Rong created FLINK-7922:


 Summary: leastRestrictive in FlinkTypeFactory does not resolve 
composite type correctly
 Key: FLINK-7922
 URL: https://issues.apache.org/jira/browse/FLINK-7922
 Project: Flink
  Issue Type: Bug
Reporter: Rong Rong


FlinkTypeFactory does not override the following function correctly:
`leastRestrictive(types: util.List[RelDataType]): RelDataType = { //... }` 
to deal with situations like
```
CASE 
  WHEN  THEN 
 
  ELSE 
NULL 
END
```
will trigger runtime exception



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7666) ContinuousFileReaderOperator swallows chained watermarks

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219203#comment-16219203
 ] 

ASF GitHub Bot commented on FLINK-7666:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4900
  
I see, okay, the status checking in `trigger()` together with the tight 
locking contract with StreamTask fixes that.

My feeling is that this is a workaround to support another non-ideal 
design, but it should work as a temporary fix, because it is not possible to 
rework the file monitoring source before the next release.

Good with me to merge this...


> ContinuousFileReaderOperator swallows chained watermarks
> 
>
> Key: FLINK-7666
> URL: https://issues.apache.org/jira/browse/FLINK-7666
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
>Reporter: Ufuk Celebi
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> I use event time and read from a (finite) file. I assign watermarks right 
> after the {{ContinuousFileReaderOperator}} with parallelism 1.
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}
> The watermarks I assign never progress through the pipeline.
> I can work around this by inserting a {{shuffle()}} after the file reader or 
> starting a new chain at the assigner:
> {code}
> env
>   .readFile(new TextInputFormat(...), ...)
>   .setParallelism(1)
>   .shuffle() 
>   .assignTimestampsAndWatermarks(...)
>   .setParallelism(1)
>   .map()...
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4900: [FLINK-7666] Close TimeService after closing operators.

2017-10-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4900
  
I see, okay, the status checking in `trigger()` together with the tight 
locking contract with StreamTask fixes that.

My feeling is that this is a workaround to support another non-ideal 
design, but it should work as a temporary fix, because it is not possible to 
rework the file monitoring source before the next release.

Good with me to merge this...


---


[jira] [Commented] (FLINK-7846) Remove guava shading from ES2 connector

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219200#comment-16219200
 ] 

ASF GitHub Bot commented on FLINK-7846:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4902
  
Looks good, +1 to merge this


> Remove guava shading from ES2 connector
> ---
>
> Key: FLINK-7846
> URL: https://issues.apache.org/jira/browse/FLINK-7846
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>
> The ElasticSearch 2 connector pom has a shading configuration for guava. The 
> only user of guava is the elasticsearch dependency, which is not included in 
> the jar, making the shading pointless.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7914) Expose Akka gated interval as user option

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219197#comment-16219197
 ] 

ASF GitHub Bot commented on FLINK-7914:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4903
  
Change looks good.
Is `50 ms` also akka's default value?
Out of curiosity, what triggered the need to introduce this option.


> Expose Akka gated interval as user option
> -
>
> Key: FLINK-7914
> URL: https://issues.apache.org/jira/browse/FLINK-7914
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>
> Whenever Akka loses its connection to a remote {{ActorSystem}} it gates the 
> corresponding address. The default value is {{5 s}}. Especially for tests 
> this can be too high. Therefore, I propose to expose this option to the user 
> via the {{AkkaOptions}} and setting it to {{50 ms}} per default.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4902: [FLINK-7846] [elasticsearch] Remove unnecessary guava sha...

2017-10-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4902
  
Looks good, +1 to merge this


---


[GitHub] flink issue #4903: [FLINK-7914] Introduce AkkaOptions.RETRY_GATE_CLOSED_FOR

2017-10-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4903
  
Change looks good.
Is `50 ms` also akka's default value?
Out of curiosity, what triggered the need to introduce this option.


---


[jira] [Updated] (FLINK-7921) Flink downloads link redirect to spark downloads page

2017-10-25 Thread Anil Kumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anil Kumar updated FLINK-7921:
--
Description: 
On the Quickstart 
[page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html]
 of flink, there`s a download page link under *Download and Unpack tab* which 
is redirecting to spark downloads page instead of flink.

Issue : redirection is happening to port 80(http) instead of port 443(https). 
Once the download link is changed to https, it works fine.

Current download link : http://flink.apache.org/downloads.html
Required link : https://flink.apache.org/downloads.html

  was:
On the Quickstart 
[page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html]
 of flink, there`s a download page link under *Download and Unpack tab* which 
is redirecting to spark downloads page instead of flink.

Problem is redirection is happening to port 80(http) instead of port 
443(https). Once the download link is changed to https, it works fine.

Current download link : http://flink.apache.org/downloads.html
Required link : https://flink.apache.org/downloads.html


> Flink downloads link redirect to spark downloads page
> -
>
> Key: FLINK-7921
> URL: https://issues.apache.org/jira/browse/FLINK-7921
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Anil Kumar
>
> On the Quickstart 
> [page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html]
>  of flink, there`s a download page link under *Download and Unpack tab* which 
> is redirecting to spark downloads page instead of flink.
> Issue : redirection is happening to port 80(http) instead of port 443(https). 
> Once the download link is changed to https, it works fine.
> Current download link : http://flink.apache.org/downloads.html
> Required link : https://flink.apache.org/downloads.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7921) Flink downloads link redirect to spark downloads page

2017-10-25 Thread Anil Kumar (JIRA)
Anil Kumar created FLINK-7921:
-

 Summary: Flink downloads link redirect to spark downloads page
 Key: FLINK-7921
 URL: https://issues.apache.org/jira/browse/FLINK-7921
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.3.2
Reporter: Anil Kumar


On the Quickstart 
[page|https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html]
 of flink, there`s a download page link under *Download and Unpack tab* which 
is redirecting to spark downloads page instead of flink.

Problem is redirection is happening to port 80(http) instead of port 
443(https). Once the download link is changed to https, it works fine.

Current download link : http://flink.apache.org/downloads.html
Required link : https://flink.apache.org/downloads.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7502) PrometheusReporter improvements

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219154#comment-16219154
 ] 

ASF GitHub Bot commented on FLINK-7502:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4586
  
I can't think of a better solution either.


> PrometheusReporter improvements
> ---
>
> Key: FLINK-7502
> URL: https://issues.apache.org/jira/browse/FLINK-7502
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
> Fix For: 1.4.0
>
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter

2017-10-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4586
  
I can't think of a better solution either.


---


[jira] [Commented] (FLINK-7824) Put the queryable state jars in the opt folder.

2017-10-25 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219075#comment-16219075
 ] 

Kostas Kloudas commented on FLINK-7824:
---

PR https://github.com/apache/flink/pull/4906.

> Put the queryable state jars in the opt folder.
> ---
>
> Key: FLINK-7824
> URL: https://issues.apache.org/jira/browse/FLINK-7824
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> In 1.4, to enable the queryable state, the user has to put the adequate jars 
> in the lib folder. The first step before *putting* the jars, is to *find* the 
> jars in the distribution. So the location of these jars can be in the `opt` 
> folder.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219056#comment-16219056
 ] 

ASF GitHub Bot commented on FLINK-7540:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4812
  
Thanks for the review @aljoscha. Rebasing and once Travis gives green 
light, I'll merge this PR.


> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: patch
> Fix For: 1.4.0, 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  
> } else {  host = host.trim().toLowerCase();   
> }
> ...
> }
> It turns the host name into lowercase.
> Therefore, JobManagerRetriever certainly can not find Jobmanager's 
> actorSYstem.
> Then I removed the call to the toLowerCase() method in the source code.
> 

[GitHub] flink issue #4812: [FLINK-7540] Apply consistent hostname normalization

2017-10-25 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4812
  
Thanks for the review @aljoscha. Rebasing and once Travis gives green 
light, I'll merge this PR.


---


[jira] [Commented] (FLINK-7908) Restructure the QS module to reduce client deps.

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219054#comment-16219054
 ] 

ASF GitHub Bot commented on FLINK-7908:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4906

[FLINK-7908][FLINK-7824][QS] Restructure QS packages and put QS jars in 
opt/.

## What is the purpose of the change

Make the Queryable State more usable by:
1) Reducing the dependencies the `client` jar (used by the user) 
transitively brings.
2) Putting the core and client jars in the `opt/` folder. From there, to 
activate QS the user has to put the core jar in the lib/ folder before starting 
the cluster, and use the client jar as a dependency to his/her program.

## Brief change log

Creating the `core` and `client` modules in the queryable state dir and 
moving classes around.
In addition, now the `client` module becomes a dependency of the 
`flink-runtime`.

Places the jars corresponding the aforementioned modules to the `opt` dir.

## Verifying this change

This change is a simple restructuring of the queryable state module, no 
classes are introduced or deleted.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, adds the 
`flink-queryable-state-client` dependency to `flink-runtime`.

## Documentation

  - Does this pull request introduce a new feature? NO, but it needs 
documentation, which is PENDING.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-restructure

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4906


commit 057f14d833e56fa731262384a636af13f08623d5
Author: kkloudas 
Date:   2017-10-24T10:16:08Z

[FLINK-7908][QS] Restructure the queryable state module.

The QS module is split into core and client. The core should
be put in the lib folder to enable queryable state, while the
client is the one that the user will program against. The
reason for the restructuring in mainly to remove the dependency
on the flink-runtime from the user's program.

commit dc9d5964bcdd3269fe01e8fe2ab7be74d90dd22c
Author: kkloudas 
Date:   2017-10-24T14:12:27Z

[FLINK-7824][QS] Put the QS modules in the opt folder.

Now the user can find the jars in the opt/ folder and
he can activate QS by putting the core jar in the lib/
folder and program against the client jar.




> Restructure the QS module to reduce client deps.
> 
>
> Key: FLINK-7908
> URL: https://issues.apache.org/jira/browse/FLINK-7908
> Project: Flink
>  Issue Type: Sub-task
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4906: [FLINK-7908][FLINK-7824][QS] Restructure QS packag...

2017-10-25 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/4906

[FLINK-7908][FLINK-7824][QS] Restructure QS packages and put QS jars in 
opt/.

## What is the purpose of the change

Make the Queryable State more usable by:
1) Reducing the dependencies the `client` jar (used by the user) 
transitively brings.
2) Putting the core and client jars in the `opt/` folder. From there, to 
activate QS the user has to put the core jar in the lib/ folder before starting 
the cluster, and use the client jar as a dependency to his/her program.

## Brief change log

Creating the `core` and `client` modules in the queryable state dir and 
moving classes around.
In addition, now the `client` module becomes a dependency of the 
`flink-runtime`.

Places the jars corresponding the aforementioned modules to the `opt` dir.

## Verifying this change

This change is a simple restructuring of the queryable state module, no 
classes are introduced or deleted.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, adds the 
`flink-queryable-state-client` dependency to `flink-runtime`.

## Documentation

  - Does this pull request introduce a new feature? NO, but it needs 
documentation, which is PENDING.

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-restructure

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4906


commit 057f14d833e56fa731262384a636af13f08623d5
Author: kkloudas 
Date:   2017-10-24T10:16:08Z

[FLINK-7908][QS] Restructure the queryable state module.

The QS module is split into core and client. The core should
be put in the lib folder to enable queryable state, while the
client is the one that the user will program against. The
reason for the restructuring in mainly to remove the dependency
on the flink-runtime from the user's program.

commit dc9d5964bcdd3269fe01e8fe2ab7be74d90dd22c
Author: kkloudas 
Date:   2017-10-24T14:12:27Z

[FLINK-7824][QS] Put the QS modules in the opt folder.

Now the user can find the jars in the opt/ folder and
he can activate QS by putting the core jar in the lib/
folder and program against the client jar.




---


[jira] [Commented] (FLINK-7920) Make MiniClusterConfiguration immutable

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16219041#comment-16219041
 ] 

ASF GitHub Bot commented on FLINK-7920:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4905

[FLINK-7920] Make MiniClusterConfiguration immutable

## What is the purpose of the change

Makes the `MiniClusterConfiguration` immutable.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
makeMiniClusterConfigurationImmutable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4905.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4905


commit 4c614d988599d3223d81a04f80a8ceb41a0b9e48
Author: Till Rohrmann 
Date:   2017-10-25T15:31:45Z

[FLINK-7920] Make MiniClusterConfiguration immutable




> Make MiniClusterConfiguration immutable
> ---
>
> Key: FLINK-7920
> URL: https://issues.apache.org/jira/browse/FLINK-7920
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{MiniClusterConfiguration}} should be made immutable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4905: [FLINK-7920] Make MiniClusterConfiguration immutab...

2017-10-25 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4905

[FLINK-7920] Make MiniClusterConfiguration immutable

## What is the purpose of the change

Makes the `MiniClusterConfiguration` immutable.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
makeMiniClusterConfigurationImmutable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4905.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4905


commit 4c614d988599d3223d81a04f80a8ceb41a0b9e48
Author: Till Rohrmann 
Date:   2017-10-25T15:31:45Z

[FLINK-7920] Make MiniClusterConfiguration immutable




---


[jira] [Created] (FLINK-7920) Make MiniClusterConfiguration immutable

2017-10-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7920:


 Summary: Make MiniClusterConfiguration immutable
 Key: FLINK-7920
 URL: https://issues.apache.org/jira/browse/FLINK-7920
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


The {{MiniClusterConfiguration}} should be made immutable.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry

2017-10-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7919:
-
Description: 
A job with a delta iteration fails hard with a NPE in the solution set join, if 
the solution set has no entry for the join key of the probe side.

The following program reproduces the problem:

{code}
DataSet> values = env.fromElements(
  Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));

DeltaIteration, Tuple2> di = values
  .iterateDelta(values, 5,0);

DataSet> loop = di.getWorkset()
  .map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
  // modifying the key to join on a non existing solution set key 
  return Tuple2.of(value.f0 + 1, 1);
}
  })
  .join(di.getSolutionSet()).where(0).equalTo(0)
  .with(new JoinFunction, Tuple2, 
Tuple2>() {

@Override
public Tuple2 join(
  Tuple2 first, 
  Tuple2 second) throws Exception {
  
  return Tuple2.of(first.f0, first.f1 + second.f1);
}
  });

DataSet> result = di.closeWith(loop, loop);
result.print();
{code}

It doesn't matter whether the solution set is managed or not. 

The problem is cause because the solution set hash table prober returns a 
{{null}} value if the solution set does not contain a value for the probe side 
key. 
The join operator does not check if the return value is {{null}} or not but 
immediately tries to create a copy using a {{TypeSerializer}}. This copy fails 
with a NPE.

I propose to check for {{null}} and call the join function with {{null}} on the 
solution set side. This gives OUTER JOIN semantics for join.
Since the code was previously failing with a NPE, it is safe to forward the 
{{null}} into the {{JoinFunction}}. 

However, users must be aware that the solution set value may be {{null}} and we 
need to update the documentation (JavaDocs + website) to describe the behavior.

  was:
A job with a delta iteration fails hard with a NPE in the solution set join, if 
the solution set has no entry for the join key of the probe side.

The following program reproduces the problem:

{code}
DataSet> values = env.fromElements(
  Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));

DeltaIteration, Tuple2> di = values
  .iterateDelta(values, 5,0);

DataSet> loop = di.getWorkset()
  .map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
  // modifying the key to join on a non existing solution set key 
  return Tuple2.of(value.f0 + 1, 1);
}
  })
  .join(di.getSolutionSet()).where(0).equalTo(0)
  .with(new JoinFunction, Tuple2, 
Tuple2>() {

@Override
public Tuple2 join(
  Tuple2 first, 
  Tuple2 second) throws Exception {
  
  return Tuple2.of(first.f0, first.f1 + second.f1);
}
  });

DataSet> result = di.closeWith(loop, loop);
result.print();
{code}

It doesn't matter whether the solution set is managed or not. 

The problem is cause because the solution set hash table prober returns a 
{{null}} value if the solution set does not contain a value for the probe side 
key. 
The join operator does not check if the return value is {{null}} or not but 
immediately tries to create a copy using a {{TypeSerializer}}. This copy fails 
with a NPE.

There are two solutions:
1. Check for {{null}} and do not call the join function (INNER join semantics)
2. Check for {{null}} and call the join function with {{null}} on the solution 
set side (OUTER join semantics)

Either way, the chosen behavior should be documented.


> Join with Solution Set fails with NPE if Solution Set has no entry
> --
>
> Key: FLINK-7919
> URL: https://issues.apache.org/jira/browse/FLINK-7919
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Local Runtime
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>
> A job with a delta iteration fails hard with a NPE in the solution set join, 
> if the solution set has no entry for the join key of the probe side.
> The following program reproduces the problem:
> {code}
> DataSet> values = env.fromElements(
>   Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));
> DeltaIteration, Tuple2> di = values
>   

[jira] [Commented] (FLINK-7502) PrometheusReporter improvements

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218989#comment-16218989
 ] 

ASF GitHub Bot commented on FLINK-7502:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
I mean in general it is probably not the best thing to just rely on the 
port not being available as a consensus algorithm of who should claim which 
port. Then again, I could not think of a straightforward way to coordinate 
across Job/TaskManagers without using something external, which would probably 
get too involved for a metrics-related component – maybe there is something I 
am missing?


> PrometheusReporter improvements
> ---
>
> Key: FLINK-7502
> URL: https://issues.apache.org/jira/browse/FLINK-7502
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
> Fix For: 1.4.0
>
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter

2017-10-25 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
I mean in general it is probably not the best thing to just rely on the 
port not being available as a consensus algorithm of who should claim which 
port. Then again, I could not think of a straightforward way to coordinate 
across Job/TaskManagers without using something external, which would probably 
get too involved for a metrics-related component – maybe there is something I 
am missing?


---


[jira] [Created] (FLINK-7919) Join with Solution Set fails with NPE if Solution Set has no entry

2017-10-25 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7919:


 Summary: Join with Solution Set fails with NPE if Solution Set has 
no entry
 Key: FLINK-7919
 URL: https://issues.apache.org/jira/browse/FLINK-7919
 Project: Flink
  Issue Type: Bug
  Components: DataSet API, Local Runtime
Affects Versions: 1.3.2, 1.4.0
Reporter: Fabian Hueske


A job with a delta iteration fails hard with a NPE in the solution set join, if 
the solution set has no entry for the join key of the probe side.

The following program reproduces the problem:

{code}
DataSet> values = env.fromElements(
  Tuple2.of(1L, 1), Tuple2.of(2L, 1), Tuple2.of(3L, 1));

DeltaIteration, Tuple2> di = values
  .iterateDelta(values, 5,0);

DataSet> loop = di.getWorkset()
  .map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws 
Exception {
  // modifying the key to join on a non existing solution set key 
  return Tuple2.of(value.f0 + 1, 1);
}
  })
  .join(di.getSolutionSet()).where(0).equalTo(0)
  .with(new JoinFunction, Tuple2, 
Tuple2>() {

@Override
public Tuple2 join(
  Tuple2 first, 
  Tuple2 second) throws Exception {
  
  return Tuple2.of(first.f0, first.f1 + second.f1);
}
  });

DataSet> result = di.closeWith(loop, loop);
result.print();
{code}

It doesn't matter whether the solution set is managed or not. 

The problem is cause because the solution set hash table prober returns a 
{{null}} value if the solution set does not contain a value for the probe side 
key. 
The join operator does not check if the return value is {{null}} or not but 
immediately tries to create a copy using a {{TypeSerializer}}. This copy fails 
with a NPE.

There are two solutions:
1. Check for {{null}} and do not call the join function (INNER join semantics)
2. Check for {{null}} and call the join function with {{null}} on the solution 
set side (OUTER join semantics)

Either way, the chosen behavior should be documented.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4904: [hotfix] reorder the methods so they conform to th...

2017-10-25 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4904

[hotfix] reorder the methods so they conform to their order in the interface

## What is the purpose of the change

As discussed with @aljoscha in https://github.com/apache/flink/pull/4881, 
I'm moving those unmerged changes to this PR. The purpose is to reorder the 
impl methods so they conform to their order in the interface

## Brief change log

- reorder the impl methods 

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4904.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4904


commit 7c16e92ca67f7d6fb59136ba908959d3deaef058
Author: Bowen Li 
Date:   2017-10-25T15:01:22Z

[hotfix] reorder the methods so they conform to the order in the interface




---


[GitHub] flink issue #4881: [FLINK-7864] [DataStream API] Support side-outputs in CoP...

2017-10-25 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4881
  
Sounds good. Thanks!


---


[jira] [Commented] (FLINK-7864) Support side-outputs in CoProcessFunction

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218907#comment-16218907
 ] 

ASF GitHub Bot commented on FLINK-7864:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4881
  
Sounds good. Thanks!


> Support side-outputs in CoProcessFunction
> -
>
> Key: FLINK-7864
> URL: https://issues.apache.org/jira/browse/FLINK-7864
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> We forgot to add support for side-outputs when we added that to 
> {{ProcessFunction}}. Should be as easy as adding it to the {{Context}} and 
> wiring it in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-25 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218850#comment-16218850
 ] 

Piotr Nowojski edited comment on FLINK-7838 at 10/25/17 3:20 PM:
-

Added log with another dead lock example at 
org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in 
restoring state.


was (Author: pnowojski):
Add log with another dead lock example at 
org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in 
restoring state.

> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7880) flink-queryable-state-java fails with core-dump

2017-10-25 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218876#comment-16218876
 ] 

Till Rohrmann commented on FLINK-7880:
--

Apparently, there are also other QS tests affected by this. Can we either fix 
this or make sure that only tests are affected but not the real usage of the QS 
client?

https://travis-ci.org/tillrohrmann/flink/jobs/292608461

> flink-queryable-state-java fails with core-dump
> ---
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7916) Remove NetworkStackThroughputITCase

2017-10-25 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218859#comment-16218859
 ] 

Ufuk Celebi commented on FLINK-7916:


We also have 
{{flink-tests/src/test/java/org/apache/flink/test/state/ManualWindowSpeedITCase.java}}.
 Agree that this is an anti-pattern and vote to either remove or establish a 
"benchmark" module.

> Remove NetworkStackThroughputITCase
> ---
>
> Key: FLINK-7916
> URL: https://issues.apache.org/jira/browse/FLINK-7916
> Project: Flink
>  Issue Type: Task
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
> Fix For: 1.5.0
>
>
> Flink's code base contains the {{NetworkStackThroughputITCase}} which is not 
> really a test. Moreover it is marked as {{Ignored}}. I propose to remove this 
> test because it is more of a benchmark. We could think about creating a 
> benchmark project where we move these kind of "tests".
> In general I think we should remove ignored tests if they won't be fixed 
> immediately. The danger is far too high that we forget about them and then we 
> only keep the maintenance burden of it. This is especially true for the above 
> mentioned test case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7542) Some tests in AggregateITCase fail for some Time Zones

2017-10-25 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7542:


Assignee: Fabian Hueske

> Some tests in AggregateITCase fail for some Time Zones
> --
>
> Key: FLINK-7542
> URL: https://issues.apache.org/jira/browse/FLINK-7542
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Usman Younas
>Assignee: Fabian Hueske
>
> In {{org.apache.flink.table.runtime.batch.sql.AggregateITCase}} two tests 
> 1. testTumbleWindowAggregate and 
> 2. testHopWindowAggregate 
> are failing for some time zones.
> Bug can be produced by changing the time zone of machine to
> Time Zone: Central Daylight Time
> Closest City: Houston-United States
> I think, problem is with Timestamp.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7380) Limit usage of Row type

2017-10-25 Thread Joshua Griffith (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218854#comment-16218854
 ] 

Joshua Griffith commented on FLINK-7380:


I use GenericTypeInfo to build trees using delta iterations because 
RowTypeInfo doesn't support recursive definitions. I have to use Rows because I 
only have type information at runtime when I'm building the job. Perhaps this 
could be a warning rather than an exception? Alternatively, it would be useful 
if there was an easier way to dynamically build recursive type information.

> Limit usage of Row type
> ---
>
> Key: FLINK-7380
> URL: https://issues.apache.org/jira/browse/FLINK-7380
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The recently introduced {{Row}} type causes a lot of confusion for users. By 
> default they are serialized using Kryo. We should not allow to use 
> {{GenericTypeInfo}}. The TypeExtractor should throw an exception and 
> encourage to provide proper field types. The Row class should be final and 
> only targeted for intended use cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-25 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218850#comment-16218850
 ] 

Piotr Nowojski edited comment on FLINK-7838 at 10/25/17 3:07 PM:
-

Add log with another dead lock example at 
org.apache.kafka.clients.producer.KafkaProducer.initTransactions, called in 
restoring state.


was (Author: pnowojski):
Deadlock on another phase.

> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7918) Run AbstractTestBase tests on Flip-6 MiniCluster

2017-10-25 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7918:


 Summary: Run AbstractTestBase tests on Flip-6 MiniCluster
 Key: FLINK-7918
 URL: https://issues.apache.org/jira/browse/FLINK-7918
 Project: Flink
  Issue Type: New Feature
  Components: Tests
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


In order to extend our test coverage for Flip-6, we should enable all 
{{AbstractTestBase}} tests to be executed via Flip-6's {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish

2017-10-25 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-7838:
--
Attachment: initTransactions_deadlock.txt

Deadlock on another phase.

> Kafka011ProducerExactlyOnceITCase do not finish
> ---
>
> Key: FLINK-7838
> URL: https://issues.apache.org/jira/browse/FLINK-7838
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
> Attachments: initTransactions_deadlock.txt, log.txt
>
>
> See attached log



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7913) Add support for Kafka default partitioner

2017-10-25 Thread Konstantin Lalafaryan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218776#comment-16218776
 ] 

Konstantin Lalafaryan commented on FLINK-7913:
--

Thanks for your comment.
Yes, you are right. But I have just found out that you can use the Kafka's 
default partitioner by doing following:
{code:java}
 outputStream.addSink(new 
FlinkKafkaProducer010<>(producerProperties.getProperty(TOPIC), new 
EventSerializationSchema(),producerProperties, null));
{code}
Basically you have to pass null value for customPartitioner.



> Add support for Kafka default partitioner
> -
>
> Key: FLINK-7913
> URL: https://issues.apache.org/jira/browse/FLINK-7913
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Konstantin Lalafaryan
>Assignee: Konstantin Lalafaryan
> Fix For: 1.5.0
>
>
> Currently in the Apache Flink it is available only *FlinkKafkaPartitioner* 
> and just one implementation *FlinkFixedPartitioner*. 
> In order to be able to use Kafka's default partitioner you have to create new 
> implementation for *FlinkKafkaPartitioner* and fork the code from the Kafka. 
> It will be really good to be able to define the partitioner without 
> implementing the new class.
> Thanks.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7669) Always load Flink classes through parent ClassLoader

2017-10-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16218670#comment-16218670
 ] 

ASF GitHub Bot commented on FLINK-7669:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4891
  
Merged


> Always load Flink classes through parent ClassLoader
> 
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
> submit job 05dd8e60c6fda3b96fc22ef6cf389a23 (Flink Streaming Job)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1358)
>   at 
> 

  1   2   3   >