[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519961#comment-16519961
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329533
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
--- End diff --

I think this check looks like a bug.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519968#comment-16519968
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519962#comment-16519962
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331145
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
--- End diff --

Would be better to check the args are not null, or simply use the 
`@Nonnull` annotation.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519960#comment-16519960
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330596
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519966#comment-16519966
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519967#comment-16519967
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519959#comment-16519959
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330095
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519964#comment-16519964
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

Why this couldn't be static?


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519958#comment-16519958
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329976
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
--- End diff --

I would suggest to give a init size for 

[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519963#comment-16519963
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330813
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -48,7 +48,8 @@
KeyedStateBackend,
Snapshotable, 
Collection>,
Closeable,
-   CheckpointListener {
+   CheckpointListener,
+   KeyedStateFactory{
--- End diff --

I think this seems to miss a space ` `.


> Wrap state binder with TTL logic
> 
>
> Key: FLINK-9513
> URL: https://issues.apache.org/jira/browse/FLINK-9513
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The main idea is to wrap user state value with a class holding the value and 
> the expiration timestamp (maybe meta data in future) and use the new object 
> as a value in the existing implementations:
> {code:java}
> class TtlValue {
>   V value;
>   long expirationTimestamp;
> }
> {code}
> The original state binder factory is wrapped with TtlStateBinder if TTL is 
> enabled:
> {code:java}
> state = ttlConfig.updateType == DISABLED ?
>  bind(binder) : bind(new TtlStateBinder(binder, timerService));
> {code}
> TtlStateBinder decorates the states produced by the original binder with TTL 
> logic wrappers and adds TtlValue serialisation logic:
> {code:java}
> TtlStateBinder {
> StateBinder binder;
> ProcessingTimeProvier timeProvider; // System.currentTimeMillis()
>  TtlValueState createValueState(valueDesc) {
>  serializer = new TtlValueSerializer(valueDesc.getSerializer);
>  ttlValueDesc = new ValueDesc(serializer, ...);
>  // or implement custom TypeInfo
>  originalStateWithTtl = binder.createValueState(valueDesc);
>      return new TtlValueState(originalStateWithTtl, timeProvider);
> }
>   // List, Map, ...
> }
> {code}
> TTL serializer should add expiration timestamp



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9513) Wrap state binder with TTL logic

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9513?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519965#comment-16519965
 ] 

ASF GitHub Bot commented on FLINK-9513:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329976
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
--- End diff --

I would suggest to give a init size for `originalValues`, e.g. `List 
originalValues = new ArrayList(originalSerializers.size());`.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331145
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
--- End diff --

Would be better to check the args are not null, or simply use the 
`@Nonnull` annotation.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330095
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330813
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
 ---
@@ -48,7 +48,8 @@
KeyedStateBackend,
Snapshotable, 
Collection>,
Closeable,
-   CheckpointListener {
+   CheckpointListener,
+   KeyedStateFactory{
--- End diff --

I think this seems to miss a space ` `.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());
+   if (stateFactory 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197329533
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
--- End diff --

I think this check looks like a bug.


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
--- End diff --

Why this couldn't be static?


---


[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331898
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());
+   if (stateFactory 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197330596
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
 ---
@@ -0,0 +1,204 @@
+package org.apache.flink.api.common.typeutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Base class for composite serializers.
+ *
+ * This class serializes a list of objects
+ *
+ * @param  type of custom serialized value
+ */
+@SuppressWarnings("unchecked")
+public abstract class CompositeSerializer extends TypeSerializer {
+   private final List originalSerializers;
+
+   protected CompositeSerializer(List originalSerializers) 
{
+   Preconditions.checkNotNull(originalSerializers);
+   this.originalSerializers = originalSerializers;
+   }
+
+   protected abstract T composeValue(List values);
+
+   protected abstract List decomposeValue(T v);
+
+   protected abstract CompositeSerializer 
createSerializerInstance(List originalSerializers);
+
+   private T composeValueInternal(List values) {
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return composeValue(values);
+   }
+
+   private List decomposeValueInternal(T v) {
+   List values = decomposeValue(v);
+   Preconditions.checkArgument(values.size() == 
originalSerializers.size());
+   return values;
+   }
+
+   private CompositeSerializer 
createSerializerInstanceInternal(List originalSerializers) {
+   Preconditions.checkArgument(originalSerializers.size() == 
originalSerializers.size());
+   return createSerializerInstance(originalSerializers);
+   }
+
+   @Override
+   public CompositeSerializer duplicate() {
+   return 
createSerializerInstanceInternal(originalSerializers.stream()
+   .map(TypeSerializer::duplicate)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public boolean isImmutableType() {
+   return 
originalSerializers.stream().allMatch(TypeSerializer::isImmutableType);
+   }
+
+   @Override
+   public T createInstance() {
+   return composeValueInternal(originalSerializers.stream()
+   .map(TypeSerializer::createInstance)
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from) {
+   List originalValues = decomposeValueInternal(from);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public T copy(T from, T reuse) {
+   List originalFromValues = decomposeValueInternal(from);
+   List originalReuseValues = decomposeValueInternal(reuse);
+   return composeValueInternal(
+   IntStream.range(0, originalSerializers.size())
+   .mapToObj(i -> 
originalSerializers.get(i).copy(originalFromValues.get(i), 
originalReuseValues.get(i)))
+   .collect(Collectors.toList()));
+   }
+
+   @Override
+   public int getLength() {
+   return originalSerializers.stream().allMatch(s -> s.getLength() 
>= 0) ?
+   
originalSerializers.stream().mapToInt(TypeSerializer::getLength).sum() : -1;
+   }
+
+   @Override
+   public void serialize(T record, DataOutputView target) throws 
IOException {
+   List originalValues = decomposeValueInternal(record);
+   for (int i = 0; i < originalSerializers.size(); i++) {
+   
originalSerializers.get(i).serialize(originalValues.get(i), target);
+   }
+   }
+
+   @Override
+   public T deserialize(DataInputView source) throws IOException {
+   List originalValues = new ArrayList();
+   for (TypeSerializer typeSerializer : originalSerializers) {
+   originalValues.add(typeSerializer.deserialize(source));
+   }
+   return composeValueInternal(originalValues);
+   }
+
+   @Override
+   public T deserialize(T reuse, DataInputView 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331741
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());
+   if (stateFactory 

[GitHub] flink pull request #6196: [FLINK-9513] Implement TTL state wrappers factory ...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6196#discussion_r197331764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
 ---
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.CompositeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This state factory wraps state objects, produced by backends, with TTL 
logic.
+ */
+public class TtlStateFactory {
+   public static  IS 
createStateAndWrapWithTtlIfEnabled(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc,
+   KeyedStateFactory originalStateFactory,
+   TtlConfig ttlConfig,
+   TtlTimeProvider timeProvider) throws Exception {
+   return ttlConfig.getTtlUpdateType() == TtlUpdateType.Disabled ?
+   originalStateFactory.createState(namespaceSerializer, 
stateDesc) :
+   new TtlStateFactory(originalStateFactory, ttlConfig, 
timeProvider)
+   .createState(namespaceSerializer, stateDesc);
+   }
+
+   private final Map, StateFactory> 
stateFactories;
+
+   private final KeyedStateFactory originalStateFactory;
+   private final TtlConfig ttlConfig;
+   private final TtlTimeProvider timeProvider;
+
+   private TtlStateFactory(KeyedStateFactory originalStateFactory, 
TtlConfig ttlConfig, TtlTimeProvider timeProvider) {
+   this.originalStateFactory = originalStateFactory;
+   this.ttlConfig = ttlConfig;
+   this.timeProvider = timeProvider;
+   this.stateFactories = createStateFactories();
+   }
+
+   private Map, StateFactory> 
createStateFactories() {
+   return Stream.of(
+   Tuple2.of(ValueStateDescriptor.class, (StateFactory) 
this::createValueState),
+   Tuple2.of(ListStateDescriptor.class, (StateFactory) 
this::createListState),
+   Tuple2.of(MapStateDescriptor.class, (StateFactory) 
this::createMapState),
+   Tuple2.of(ReducingStateDescriptor.class, (StateFactory) 
this::createReducingState),
+   Tuple2.of(AggregatingStateDescriptor.class, 
(StateFactory) this::createAggregatingState),
+   Tuple2.of(FoldingStateDescriptor.class, (StateFactory) 
this::createFoldingState)
+   ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+   }
+
+   private interface StateFactory {
+IS create(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception;
+   }
+
+   private  IS createState(
+   TypeSerializer namespaceSerializer,
+   StateDescriptor stateDesc) throws Exception {
+   StateFactory stateFactory = 
stateFactories.get(stateDesc.getClass());
+   if (stateFactory 

[jira] [Commented] (FLINK-4301) Parameterize Flink version in Quickstart bash script

2018-06-21 Thread xueyu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519949#comment-16519949
 ] 

xueyu commented on FLINK-4301:
--

Does this refer to the script https://flink.apache.org/q/quickstart.sh? If it 
is, does it mean should modify https://flink.apache.org/q/quickstart.sh and doc 
both?
It looks this script is not in the flink git project

> Parameterize Flink version in Quickstart bash script
> 
>
> Key: FLINK-4301
> URL: https://issues.apache.org/jira/browse/FLINK-4301
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Timo Walther
>Assignee: Simone Robutti
>Priority: Major
>  Labels: starter
>
> The Flink version is hard coded in the quickstart script (for Scala and 
> Java). Thus, even if a user is in the Flink 1.0 docs the scripts are 
> producing a quickstart of the Flink 1.1 release. It would be better if the 
> one-liner in the documentation would contain the version such that a Flink 
> 1.0 quickstart is built in the 1.0 documentation and 1.1 quickstart in the 
> 1.1 documentation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-21 Thread Youjun Yuan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-9630:
---
Description: 
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that has free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snapshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not even invoke the 
partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below is the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 

  was:
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 


> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---
>
> Key: FLINK-9630
> URL: https://issues.apache.org/jira/browse/FLINK-9630
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka 

[jira] [Commented] (FLINK-9641) Pulsar Source Connector

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519815#comment-16519815
 ] 

ASF GitHub Bot commented on FLINK-9641:
---

GitHub user cckellogg opened a pull request:

https://github.com/apache/flink/pull/6200

[FLINK-9641] [streaming-connectors] Flink pulsar source connector

## What is the purpose of the change

This pull request adds a 
[pulsar](https://github.com/apache/incubator-pulsar) source connector which 
will enable flink jobs to process messages from pulsar topics.

## Brief change log
 - Add a PulsarConsumerSource connector

## Verifying this change
This change adds unit test to verify checkpointing and batch message 
acknowledgements.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation
  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6200


commit b69fb21dc82e7922f7b7e65c94c154d56e442e5e
Author: Chris 
Date:   2018-06-20T21:53:06Z

Add a simple pulsar source connector.

commit fb170c435abb2b2e09913a0430d2f73dc1edbbe1
Author: Chris 
Date:   2018-06-21T00:03:12Z

Remove metrics class and add max ack batch size.




> Pulsar Source Connector
> ---
>
> Key: FLINK-9641
> URL: https://issues.apache.org/jira/browse/FLINK-9641
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Chris Kellogg
>Priority: Minor
>  Labels: pull-request-available
>
> Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
> messaging system currently in apache incubation. It is a very active project 
> and there are committers from various companies and good adoption. This pr 
> will add a source function to allow Flink jobs to process messages from 
> Pulsar topics.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9641) Pulsar Source Connector

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9641:
--
Labels: pull-request-available  (was: )

> Pulsar Source Connector
> ---
>
> Key: FLINK-9641
> URL: https://issues.apache.org/jira/browse/FLINK-9641
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Chris Kellogg
>Priority: Minor
>  Labels: pull-request-available
>
> Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
> messaging system currently in apache incubation. It is a very active project 
> and there are committers from various companies and good adoption. This pr 
> will add a source function to allow Flink jobs to process messages from 
> Pulsar topics.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6200: [FLINK-9641] [streaming-connectors] Flink pulsar s...

2018-06-21 Thread cckellogg
GitHub user cckellogg opened a pull request:

https://github.com/apache/flink/pull/6200

[FLINK-9641] [streaming-connectors] Flink pulsar source connector

## What is the purpose of the change

This pull request adds a 
[pulsar](https://github.com/apache/incubator-pulsar) source connector which 
will enable flink jobs to process messages from pulsar topics.

## Brief change log
 - Add a PulsarConsumerSource connector

## Verifying this change
This change adds unit test to verify checkpointing and batch message 
acknowledgements.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation
  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6200.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6200


commit b69fb21dc82e7922f7b7e65c94c154d56e442e5e
Author: Chris 
Date:   2018-06-20T21:53:06Z

Add a simple pulsar source connector.

commit fb170c435abb2b2e09913a0430d2f73dc1edbbe1
Author: Chris 
Date:   2018-06-21T00:03:12Z

Remove metrics class and add max ack batch size.




---


[jira] [Created] (FLINK-9641) Pulsar Source Connector

2018-06-21 Thread Chris Kellogg (JIRA)
Chris Kellogg created FLINK-9641:


 Summary: Pulsar Source Connector
 Key: FLINK-9641
 URL: https://issues.apache.org/jira/browse/FLINK-9641
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Chris Kellogg


Pulsar (https://github.com/apache/incubator-pulsar) is a distributed pub-sub 
messaging system currently in apache incubation. It is a very active project 
and there are committers from various companies and good adoption. This pr will 
add a source function to allow Flink jobs to process messages from Pulsar 
topics.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279905
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
--- End diff --

`JarRunHandler` could use this method as well.


---


[jira] [Commented] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519784#comment-16519784
 ] 

ASF GitHub Bot commented on FLINK-9624:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279905
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Contains utility methods for clients.
+ */
+public enum ClientUtils {
+   ;
+
+   /**
+* Uploads the user jars from the given {@link JobGraph} using the 
given {@link BlobClient},
+* and sets the appropriate blobkeys.
+*
+* @param jobGraph   jobgraph requiring user jars
+* @param blobClient client to upload jars with
+* @throws IOException if the upload fails
+*/
+   public static void uploadAndSetUserJars(JobGraph jobGraph, BlobClient 
blobClient) throws IOException {
--- End diff --

`JarRunHandler` could use this method as well.


> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519783#comment-16519783
 ] 

ASF GitHub Bot commented on FLINK-9624:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279550
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
 
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
+   List userJars = jobGraph.getUserJars();
+   Map userArtifacts = 
jobGraph.getUserArtifacts();
--- End diff --

this entire block is effectively duplicated in several classes and could 
also be moved to `ClientUtils`, but I wasn't sure whether this wouldn't put too 
much logic into a single method,


> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279550
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -323,17 +325,18 @@ public JobSubmissionResult submitJob(JobGraph 
jobGraph, ClassLoader classLoader)
(BlobServerPortResponseBody response, String 
dispatcherAddress) -> {
final int blobServerPort = response.port;
final InetSocketAddress address = new 
InetSocketAddress(dispatcherAddress, blobServerPort);
-   final List keys;
-   try {
-   log.info("Uploading jar files.");
-   keys = BlobClient.uploadFiles(address, 
flinkConfig, jobGraph.getJobID(), jobGraph.getUserJars());
-   jobGraph.uploadUserArtifacts(address, 
flinkConfig);
-   } catch (IOException ioe) {
-   throw new CompletionException(new 
FlinkException("Could not upload job files.", ioe));
-   }
 
-   for (PermanentBlobKey key : keys) {
-   jobGraph.addUserJarBlobKey(key);
+   List userJars = jobGraph.getUserJars();
+   Map userArtifacts = 
jobGraph.getUserArtifacts();
--- End diff --

this entire block is effectively duplicated in several classes and could 
also be moved to `ClientUtils`, but I wasn't sure whether this wouldn't put too 
much logic into a single method,


---


[jira] [Commented] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519781#comment-16519781
 ] 

ASF GitHub Bot commented on FLINK-9624:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279258
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
--- End diff --

missing javadoc


> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519782#comment-16519782
 ] 

ASF GitHub Bot commented on FLINK-9624:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279307
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

missing test


> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> 

[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279258
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/client/ClientUtilsTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.client;
+
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * TODO: add javadoc.
--- End diff --

missing javadoc


---


[jira] [Updated] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9624:
--
Labels: pull-request-available  (was: )

> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9624) Move jar/artifact upload logic out of JobGraph

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519779#comment-16519779
 ] 

ASF GitHub Bot commented on FLINK-9624:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6199

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

## What is the purpose of the change

This PR moves the logic for uploading jars/artifacts from the jobgraph into 
a separate utility class usable by all submission methods.

The new `ClientUtils` class exposes 2 methods for uploading jars/artifacts 
and setting the respective blob keys on the `JobGraph`.
All existing job-submission method were updated to use the new utilities 
and should now behave the same.

The subsumed methods in `JobGraph` were removed, but remnants of them 
remain in 2 added methods:
* setUserArtifactBlobKey sets the blobkey for a specific entry
* finalizeUserArtifactEntries writes the artifact entries into the 
`ExecutionConfig`


## Verifying this change

* `ClientUtils` is tested in `ClientUtilsTest`
* `JobGraph` changes are covered in `JobGraphTest`
* client modifications are covered by various existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_delta

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6199


commit 13e3dc7dc9c0b7205223368a460993a309cb58ad
Author: zentol 
Date:   2018-06-13T16:21:21Z

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph




> Move jar/artifact upload logic out of JobGraph
> --
>
> Key: FLINK-9624
> URL: https://issues.apache.org/jira/browse/FLINK-9624
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The {{JobGraph}} offers utility methods for uploading jars and artifacts to 
> the BlobService.
> However, how these files are uploaded isn't a concern of the {{JobGraph}} but 
> the submission-method, like the {{RestClusterClient}}.
> These methods should be moved into a utility class.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6199#discussion_r197279307
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---
@@ -552,96 +550,33 @@ public boolean hasUsercodeJarFiles() {
return this.userJarBlobKeys;
}
 
-   /**
-* Uploads the previously added user JAR files to the job manager 
through
-* the job manager's BLOB server. The BLOB servers' address is given as 
a
-* parameter. This function issues a blocking call.
-*
-* @param blobServerAddress of the blob server to upload the jars to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the JobManager 
failed.
-*/
-   public void uploadUserJars(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-   if (!userJars.isEmpty()) {
-   List blobKeys = 
BlobClient.uploadFiles(
-   blobServerAddress, blobClientConfig, jobID, 
userJars);
-
-   for (PermanentBlobKey blobKey : blobKeys) {
-   if (!userJarBlobKeys.contains(blobKey)) {
-   userJarBlobKeys.add(blobKey);
-   }
-   }
-   }
-   }
-
@Override
public String toString() {
return "JobGraph(jobId: " + jobID + ")";
}
 
-   /**
-* Configures JobGraph with user specified artifacts. If the files are 
in local system it uploads them
-* to the BLOB server, otherwise it just puts metadata for future 
remote access from within task executor.
-*
-* @param blobServerAddress of the blob server to upload the files to
-* @param blobClientConfig the blob client configuration
-* @throws IOException Thrown, if the file upload to the Blob server 
failed.
-*/
-   public void uploadUserArtifacts(
-   InetSocketAddress blobServerAddress,
-   Configuration blobClientConfig) throws IOException {
-
-   Set> 
uploadToBlobServer = new HashSet<>();
-   Set> 
distributeViaDFS = new HashSet<>();
-
-   for (Map.Entry 
userArtifact : userArtifacts.entrySet()) {
-   Path filePath = new 
Path(userArtifact.getValue().filePath);
-
-   try {
-   if (filePath.getFileSystem().isDistributedFS()) 
{
-   distributeViaDFS.add(userArtifact);
-   } else {
-   uploadToBlobServer.add(userArtifact);
-   }
-
-   } catch (IOException ex) {
-   distributeViaDFS.add(userArtifact);
-   }
+   public void setUserArtifactBlobKey(String entryName, PermanentBlobKey 
blobKey) {
+   byte[] serializedBlobKey;
+   try {
+   serializedBlobKey = 
InstantiationUtil.serializeObject(blobKey);
+   } catch (IOException e) {
+   throw new FlinkRuntimeException("Could not serialize 
blobkey " + blobKey + ".", e);
}
+   userArtifacts.computeIfPresent(entryName, (key, originalEntry) 
-> new DistributedCache.DistributedCacheEntry(
+   originalEntry.filePath,
+   originalEntry.isExecutable,
+   serializedBlobKey,
+   originalEntry.isZipped
+   ));
+   }
 
-   uploadViaBlob(blobServerAddress, blobClientConfig, 
uploadToBlobServer);
-
-   for (Map.Entry 
userArtifact : distributeViaDFS) {
+   public void finalizeUserArtifactEntries() {
--- End diff --

missing test


---


[GitHub] flink pull request #6199: [FLINK-9624][runtime] Move jar/artifact upload out...

2018-06-21 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6199

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph

## What is the purpose of the change

This PR moves the logic for uploading jars/artifacts from the jobgraph into 
a separate utility class usable by all submission methods.

The new `ClientUtils` class exposes 2 methods for uploading jars/artifacts 
and setting the respective blob keys on the `JobGraph`.
All existing job-submission method were updated to use the new utilities 
and should now behave the same.

The subsumed methods in `JobGraph` were removed, but remnants of them 
remain in 2 added methods:
* setUserArtifactBlobKey sets the blobkey for a specific entry
* finalizeUserArtifactEntries writes the artifact entries into the 
`ExecutionConfig`


## Verifying this change

* `ClientUtils` is tested in `ClientUtilsTest`
* `JobGraph` changes are covered in `JobGraphTest`
* client modifications are covered by various existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_delta

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6199


commit 13e3dc7dc9c0b7205223368a460993a309cb58ad
Author: zentol 
Date:   2018-06-13T16:21:21Z

[FLINK-9624][runtime] Move jar/artifact upload out of jobgraph




---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519658#comment-16519658
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5410
  
Taking a look once more.


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>  Labels: pull-request-available
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8468:
--
Labels: pull-request-available  (was: )

> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>  Labels: pull-request-available
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage of AMQP ...

2018-06-21 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5410
  
Taking a look once more.


---


[jira] [Closed] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9599.
---
   Resolution: Fixed
Fix Version/s: 1.5.1

master: ae8cef3de2f790c4de834982751f4fc49359ee05

1.5: e0265062e95df7c4cbcbc78c62c735174549cd4b

> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9467) No Watermark display on Web UI

2018-06-21 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9467:

Labels:   (was: pull-request-available)

> No Watermark display on Web UI
> --
>
> Key: FLINK-9467
> URL: https://issues.apache.org/jira/browse/FLINK-9467
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Watermark is currently not shown on the web interface,  because it still 
> queries for watermark using the old metric name `currentLowWatermark` instead 
> of the new ones `currentInputWatermark` and `currentOutputWatermark`
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9467) No Watermark display on Web UI

2018-06-21 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9467?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9467.
---
Resolution: Fixed

master: a3f290020c6d0d15d993dcdc24fa1ef98c63739b

1.5: 986a377b736c561141b66d8d1dbf207cc70317fb

> No Watermark display on Web UI
> --
>
> Key: FLINK-9467
> URL: https://issues.apache.org/jira/browse/FLINK-9467
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Watermark is currently not shown on the web interface,  because it still 
> queries for watermark using the old metric name `currentLowWatermark` instead 
> of the new ones `currentInputWatermark` and `currentOutputWatermark`
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519614#comment-16519614
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217367
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

Could we combine `targetAddress` and `targetPort` into `targetAddressPort = 
targetAddress + ':' + targetPort`?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519616#comment-16519616
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

We could directly pass in `httpMethod.getNettyHttpMethod()` instead of the 
wrapper.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519617#comment-16519617
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -239,6 +322,45 @@ public void shutdown(Time timeout) {
return responseFuture;
}
 
+   private interface Request {
+   void writeTo(Channel channel) throws IOException;
+   }
+
+   private static final class SimpleRequest implements Request {
+   private final HttpRequest httpRequest;
+
+   SimpleRequest(HttpRequest httpRequest) {
+   this.httpRequest = httpRequest;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   }
+   }
+
+   private static final class MultipartRequest implements Request {
+   private final HttpRequest httpRequest;
+   private final HttpPostRequestEncoder bodyRequestEncoder;
+
+   MultipartRequest(HttpRequest httpRequest, 
HttpPostRequestEncoder bodyRequestEncoder) {
+   this.httpRequest = httpRequest;
+   this.bodyRequestEncoder = bodyRequestEncoder;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   // this should never be false as we explicitly set the 
encoder to use multipart messages
+   if (bodyRequestEncoder.isChunked()) {
+   channel.writeAndFlush(bodyRequestEncoder);
+   }
+
+   // release data and remove temporary files if they were 
created
+   bodyRequestEncoder.cleanFiles();
+   }
+   }
--- End diff --

Nice, this looks now really sleek  


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -239,6 +322,45 @@ public void shutdown(Time timeout) {
return responseFuture;
}
 
+   private interface Request {
+   void writeTo(Channel channel) throws IOException;
+   }
+
+   private static final class SimpleRequest implements Request {
+   private final HttpRequest httpRequest;
+
+   SimpleRequest(HttpRequest httpRequest) {
+   this.httpRequest = httpRequest;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   }
+   }
+
+   private static final class MultipartRequest implements Request {
+   private final HttpRequest httpRequest;
+   private final HttpPostRequestEncoder bodyRequestEncoder;
+
+   MultipartRequest(HttpRequest httpRequest, 
HttpPostRequestEncoder bodyRequestEncoder) {
+   this.httpRequest = httpRequest;
+   this.bodyRequestEncoder = bodyRequestEncoder;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   // this should never be false as we explicitly set the 
encoder to use multipart messages
+   if (bodyRequestEncoder.isChunked()) {
+   channel.writeAndFlush(bodyRequestEncoder);
+   }
+
+   // release data and remove temporary files if they were 
created
+   bodyRequestEncoder.cleanFiles();
+   }
+   }
--- End diff --

Nice, this looks now really sleek 👍 


---


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

We could directly pass in `httpMethod.getNettyHttpMethod()` instead of the 
wrapper.


---


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217367
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

Could we combine `targetAddress` and `targetPort` into `targetAddressPort = 
targetAddress + ':' + targetPort`?


---


[jira] [Updated] (FLINK-9640) Checkpointing is aways aborted if any task has been finished

2018-06-21 Thread Hai Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou updated FLINK-9640:

Description: 
steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
{code:java}
public class DemoJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setParallelism(4);
env.enableCheckpointing(3000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream inputStream = env.addSource(new 
StringGeneratorParallelSourceFunction());

inputStream.map(String::hashCode).print();

env.execute();
}

public static class StringGeneratorParallelSourceFunction extends 
RichParallelSourceFunction {
private static final Logger LOG = 
LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
private volatile boolean running = true;
private int index;
private int subtask_nums;

@Override
public void open(Configuration parameters) throws Exception {
index = getRuntimeContext().getIndexOfThisSubtask();
subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
}

@Override
public void run(SourceContext ctx) throws Exception {

while (running) {
String data = UUID.randomUUID().toString();
ctx.collect(data);
LOG.info("subtask_index = {}, emit string = {}", index, data);
Thread.sleep(50);
if (index == subtask_nums / 2) {
running = false;
LOG.info("subtask_index = {}, finished.", index);
}
}
}

@Override
public void cancel() {
running = false;
}
}
}
{code}

3. observer jm and tm logs can be found.
*taskmanager.log*
{code:java}
2018-06-21 17:05:54,144 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
2018-06-21 17:05:54,151 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
2018-06-21 17:05:54,195 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, finished.
2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) 
switched from RUNNING to FINISHED.
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source (3/4) 
(6b2a374bec5f31112811613537dd4fd9).
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager 
- Un-registering task and sending final execution state FINISHED to 
JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
2018-06-21 17:05:54,211 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
{code}

*jobmanager.log*
{code:java}
2018-06-21 17:05:52,682 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:52,683 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) 
(de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:54,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to 
FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (3/4) 
(8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
2018-06-21 17:05:55,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting checkpoint.
2018-06-21 17:05:58,067 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 
triggering task Source: Custom Source (3/4) is not being executed at the 
moment. Aborting 

[jira] [Updated] (FLINK-9640) Checkpointing is aways aborted if any task has been finished

2018-06-21 Thread Hai Zhou (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou updated FLINK-9640:

Affects Version/s: (was: 1.3.2)

> Checkpointing is aways aborted if any task has been finished
> 
>
> Key: FLINK-9640
> URL: https://issues.apache.org/jira/browse/FLINK-9640
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0, 1.4.0, 1.5.0, 1.6.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> steps to reproduce:
> 1. build a standalone flink cluster.
> 2. submit a test job like this below:
> {code:java}
> public class DemoJob {
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.disableOperatorChaining();
> env.setParallelism(4);
> env.enableCheckpointing(3000);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> DataStream inputStream = env.addSource(new 
> StringGeneratorParallelSourceFunction());
> inputStream.map(String::hashCode).print();
> env.execute();
> }
> public static class StringGeneratorParallelSourceFunction extends 
> RichParallelSourceFunction {
> private static final Logger LOG = 
> LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
> private volatile boolean running = true;
> private int index;
> private int subtask_nums;
> @Override
> public void open(Configuration parameters) throws Exception {
> index = getRuntimeContext().getIndexOfThisSubtask();
> subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
> }
> @Override
> public void run(SourceContext ctx) throws Exception {
> while (running) {
> String data = UUID.randomUUID().toString();
> ctx.collect(data);
> LOG.info("subtask_index = {}, emit string = {}", index, data);
> Thread.sleep(50);
> if (index == subtask_nums / 2) {
> running = false;
> LOG.info("subtask_index = {}, finished.", index);
> }
> }
> }
> @Override
> public void cancel() {
> running = false;
> }
> }
> }
> {code}
> 3. observer jm and tm logs can be found.
> *taskmanager.log*
> {code:java}
> 2018-06-21 17:05:54,144 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
> 2018-06-21 17:05:54,151 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
> 2018-06-21 17:05:54,195 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 2, finished.
> 2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to FINISHED.
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for Source: Custom Source (3/4) 
> (6b2a374bec5f31112811613537dd4fd9).
> 2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Ensuring all FileSystem streams are closed for task Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
> 2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager   
>   - Un-registering task and sending final execution state 
> FINISHED to JobManager for task Source: Custom Source 
> (6b2a374bec5f31112811613537dd4fd9)
> 2018-06-21 17:05:54,211 INFO  
> com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
> subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
> {code}
> *jobmanager.log*
> {code:java}
> 2018-06-21 17:05:52,682 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
> (2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:52,683 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) 
> (de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
> 2018-06-21 17:05:54,219 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING 
> to FINISHED.
> 2018-06-21 17:05:54,224 INFO  
> 

[jira] [Created] (FLINK-9640) Checkpointing is aways aborted if any task has been finished

2018-06-21 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-9640:
---

 Summary: Checkpointing is aways aborted if any task has been 
finished
 Key: FLINK-9640
 URL: https://issues.apache.org/jira/browse/FLINK-9640
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0, 1.3.2, 1.4.0, 1.3.0, 1.6.0
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.6.0


steps to reproduce:
1. build a standalone flink cluster.
2. submit a test job like this below:
{code:java}
public class DemoJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.setParallelism(4);
env.enableCheckpointing(3000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStream inputStream = env.addSource(new 
StringGeneratorParallelSourceFunction());

inputStream.map(String::hashCode).print();

env.execute();
}

public static class StringGeneratorParallelSourceFunction extends 
RichParallelSourceFunction {
private static final Logger LOG = 
LoggerFactory.getLogger(StringGeneratorParallelSourceFunction.class);
private volatile boolean running = true;
private int index;
private int subtask_nums;

@Override
public void open(Configuration parameters) throws Exception {
index = getRuntimeContext().getIndexOfThisSubtask();
subtask_nums = getRuntimeContext().getNumberOfParallelSubtasks();
}

@Override
public void run(SourceContext ctx) throws Exception {

while (running) {
String data = UUID.randomUUID().toString();
ctx.collect(data);
LOG.info("subtask_index = {}, emit string = {}", index, data);
Thread.sleep(50);
if (index == subtask_nums / 2) {
running = false;
LOG.info("subtask_index = {}, finished.", index);
}
}
}

@Override
public void cancel() {
running = false;
}
}
}
{code}

3. observer jm and tm logs can be found.
*taskmanager.log*
{code:java}
2018-06-21 17:05:54,144 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, emit string = 5b0c2467-ad2e-4b53-b1a4-7a0f64560570
2018-06-21 17:05:54,151 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = 11af78b3-59ea-467c-a267-7c2238e44ffe
2018-06-21 17:05:54,195 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 2, finished.
2018-06-21 17:05:54,200 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) 
switched from RUNNING to FINISHED.
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source (3/4) 
(6b2a374bec5f31112811613537dd4fd9).
2018-06-21 17:05:54,201 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source (3/4) (6b2a374bec5f31112811613537dd4fd9) [FINISHED]
2018-06-21 17:05:54,202 INFO  org.apache.flink.yarn.YarnTaskManager 
- Un-registering task and sending final execution state FINISHED to 
JobManager for task Source: Custom Source (6b2a374bec5f31112811613537dd4fd9)
2018-06-21 17:05:54,211 INFO  
com.yew1eb.flink.demo.DemoJob$StringGeneratorParallelSourceFunction  - 
subtask_index = 0, emit string = f29f48fd-ca53-4a96-b596-93948c09581d
{code}

*jobmanager.log*
{code:java}
2018-06-21 17:05:52,682 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(2/4) (3aee8bf5103065e8b57ebd0e214141ae) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:52,683 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (2/4) 
(de90106d04f63cb9ea531308202e233f) switched from DEPLOYING to RUNNING.
2018-06-21 17:05:54,219 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (3/4) (6b2a374bec5f31112811613537dd4fd9) switched from RUNNING to 
FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Map (3/4) 
(8f523afb97dc848a9578f9cae5870421) switched from RUNNING to FINISHED.
2018-06-21 17:05:54,224 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Sink: Unnamed 
(3/4) (39f76a87d20cfc491e11f0b5b08ec5c2) switched from RUNNING to FINISHED.
2018-06-21 17:05:55,069 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519551#comment-16519551
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197192870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
+   Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
--- End diff --

Maybe we should pre check the `ttl` is not null, and I wonder does the 
`ttl.toMilliseconds() == 0` would make any sense?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197192870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
+   Preconditions.checkArgument(ttl.toMilliseconds() >= 0,
--- End diff --

Maybe we should pre check the `ttl` is not null, and I wonder does the 
`ttl.toMilliseconds() == 0` would make any sense?


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519550#comment-16519550
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6189
  
I've rebased the PR.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6189: [FLINK-9599][rest] RestClient supports FileUploads

2018-06-21 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6189
  
I've rebased the PR.


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519539#comment-16519539
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197188305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Okay, I see this is tricky, I agree that this should be addressed in 
another PR. We need to figure out a proper way to do that.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197188305
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

Okay, I see this is tricky, I agree that this should be addressed in 
another PR. We need to figure out a proper way to do that.


---


[jira] [Updated] (FLINK-9639) Order "User Configuration" Alphabetically on Flink Dashboard

2018-06-21 Thread Guilherme Nobre (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guilherme Nobre updated FLINK-9639:
---
Description: 
The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)

You can access the configuration screen here: 
[flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and [jobId] 
with your own

Or just click on a job and then on the *Configuration* tab

  was:
The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)

You can access the configuration screen here: 
[flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
with[jobId] your own

Or just click on a job and then on the *Configuration* tab


> Order "User Configuration" Alphabetically on Flink Dashboard
> 
>
> Key: FLINK-9639
> URL: https://issues.apache.org/jira/browse/FLINK-9639
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Guilherme Nobre
>Priority: Minor
>
> The *User Configuration* list is not ordered alphabetically. Would be nice to 
> do so, even more for people that like me have configurations with prefixes. 
> This way the configurations would be nicely ordered :)
> You can access the configuration screen here: 
> [flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and [jobId] 
> with your own
> Or just click on a job and then on the *Configuration* tab



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9639) Order "User Configuration" Alphabetically on Flink Dashboard

2018-06-21 Thread Guilherme Nobre (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guilherme Nobre updated FLINK-9639:
---
Description: 
The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)

You can access the configuration screen here: 
[flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
with[jobId] your own

Or just click on a job and then on the *Configuration* tab

  was:
The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)

 

You can access the configuration screen here: 
[flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
with[jobId] your own

Or just click on a job and then on the *Configuration* tab


> Order "User Configuration" Alphabetically on Flink Dashboard
> 
>
> Key: FLINK-9639
> URL: https://issues.apache.org/jira/browse/FLINK-9639
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Guilherme Nobre
>Priority: Minor
>
> The *User Configuration* list is not ordered alphabetically. Would be nice to 
> do so, even more for people that like me have configurations with prefixes. 
> This way the configurations would be nicely ordered :)
> You can access the configuration screen here: 
> [flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
> with[jobId] your own
> Or just click on a job and then on the *Configuration* tab



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9639) Order "User Configuration" Alphabetically on Flink Dashboard

2018-06-21 Thread Guilherme Nobre (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guilherme Nobre updated FLINK-9639:
---
Description: 
The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)

 

You can access the configuration screen here: 
[flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
with[jobId] your own

Or just click on a job and then on the *Configuration* tab

  was:The *User Configuration* list is not ordered alphabetically. Would be 
nice to do so, even more for people that like me have configurations with 
prefixes. This way the configurations would be nicely ordered :)


> Order "User Configuration" Alphabetically on Flink Dashboard
> 
>
> Key: FLINK-9639
> URL: https://issues.apache.org/jira/browse/FLINK-9639
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Guilherme Nobre
>Priority: Minor
>
> The *User Configuration* list is not ordered alphabetically. Would be nice to 
> do so, even more for people that like me have configurations with prefixes. 
> This way the configurations would be nicely ordered :)
>  
> You can access the configuration screen here: 
> [flinkdashboard]/#/jobs/[jobId]/config - replace [flinkdashboard] and 
> with[jobId] your own
> Or just click on a job and then on the *Configuration* tab



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9522) Rework Flink website

2018-06-21 Thread Fabian Hueske (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-9522.

Resolution: Done
  Assignee: Fabian Hueske

Merged to flink-web/asf-site with df053d4300aec40ae670ec9ef0ca55c09c197c87

> Rework Flink website
> 
>
> Key: FLINK-9522
> URL: https://issues.apache.org/jira/browse/FLINK-9522
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>
> Flink's website (flink.apache.org) represents and informs about the Flink 
> system and the Flink community.
> I would like to propose to rework some parts of the website to improve its 
> structure, provide more valuable information about Flink, and present new 
> features.
> In particular, I propose to:
>  * Restructure the menu into three sections:
>  ## *About Flink*: Explains what Flink is and which use cases it addresses 
> (What is it? Does it solve my problem? Is it working / who uses it?)
>  ## *Users*: helps users of Flink (Download, Docs, How to get help)
>  ## *Contributors*: Community, How to contribute, Github
>  * Rework the start page: add an overview of the feature, add a new figure, 
> move powered-by users above blog
>  * Add a detailed description about Flink discussing architectures, 
> applications, and operations. This will replacing the out-dated Features page.
>  * Rework the Use Cases page
>  * Addi a Getting Help page pointing to mailing list, stack overflow and 
> common error messages (moved from FAQ)
>  * Remove the information about the inactive IRC channel
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519507#comment-16519507
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6178


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519511#comment-16519511
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197179238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

This operation fits more for checkpoint full scan restoration with custom 
transformation of each state entry where expiration timestamp is optionally 
prolonged for downtime. The same as cleanup of expired state during full scan. 
I think it should be another issue and PR, because how can wrappers distinguish 
between old and new state before and after restart?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9467) No Watermark display on Web UI

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519506#comment-16519506
 ] 

ASF GitHub Bot commented on FLINK-9467:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6152


> No Watermark display on Web UI
> --
>
> Key: FLINK-9467
> URL: https://issues.apache.org/jira/browse/FLINK-9467
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> Watermark is currently not shown on the web interface,  because it still 
> queries for watermark using the old metric name `currentLowWatermark` instead 
> of the new ones `currentInputWatermark` and `currentOutputWatermark`
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9623) Move zipping logic out of blobservice

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519505#comment-16519505
 ] 

ASF GitHub Bot commented on FLINK-9623:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6187


> Move zipping logic out of blobservice
> -
>
> Key: FLINK-9623
> URL: https://issues.apache.org/jira/browse/FLINK-9623
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Directories given to the blob-service (primarily a use-case for the 
> distributed cache) are currently silently zipped, and later unzipped by the 
> {{FileCache}}. This tightly coupled the zipping logic in the blob-service to 
> the unzipping logic of the {{FileCache}}. The blob-service neither unzipped 
> the directory if the blob was requested, nor did it provide any means of 
> doing so manually, nor did it inform the user as to whether the requested 
> blob is a zip or not.
> My conclusion in is that the blob-service should not support directories _for 
> now_, and that instead directories for the {{distributed cache}} should be 
> explicitly zipped beforehand, given that this is the only use-case we have at 
> the moment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6187: FLINK-9623][runtime] Move zipping logic out of blo...

2018-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6187


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-21 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197179238
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

This operation fits more for checkpoint full scan restoration with custom 
transformation of each state entry where expiration timestamp is optionally 
prolonged for downtime. The same as cleanup of expired state during full scan. 
I think it should be another issue and PR, because how can wrappers distinguish 
between old and new state before and after restart?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6178


---


[GitHub] flink pull request #6152: [FLINK-9467][metrics][WebUI] Fix watermark display...

2018-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6152


---


[jira] [Created] (FLINK-9639) Order "User Configuration" Alphabetically on Flink Dashboard

2018-06-21 Thread Guilherme Nobre (JIRA)
Guilherme Nobre created FLINK-9639:
--

 Summary: Order "User Configuration" Alphabetically on Flink 
Dashboard
 Key: FLINK-9639
 URL: https://issues.apache.org/jira/browse/FLINK-9639
 Project: Flink
  Issue Type: Improvement
  Components: Web Client, Webfrontend
Affects Versions: 1.4.0
Reporter: Guilherme Nobre


The *User Configuration* list is not ordered alphabetically. Would be nice to 
do so, even more for people that like me have configurations with prefixes. 
This way the configurations would be nicely ordered :)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9623) Move zipping logic out of blobservice

2018-06-21 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9623.
---
Resolution: Fixed

master: 3f07ecc652c07101ae1b1a8fb17f89a2d1206e1e

> Move zipping logic out of blobservice
> -
>
> Key: FLINK-9623
> URL: https://issues.apache.org/jira/browse/FLINK-9623
> Project: Flink
>  Issue Type: Improvement
>  Components: Job-Submission
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Directories given to the blob-service (primarily a use-case for the 
> distributed cache) are currently silently zipped, and later unzipped by the 
> {{FileCache}}. This tightly coupled the zipping logic in the blob-service to 
> the unzipping logic of the {{FileCache}}. The blob-service neither unzipped 
> the directory if the blob was requested, nor did it provide any means of 
> doing so manually, nor did it inform the user as to whether the requested 
> blob is a zip or not.
> My conclusion in is that the blob-service should not support directories _for 
> now_, and that instead directories for the {{distributed cache}} should be 
> explicitly zipped beforehand, given that this is the only use-case we have at 
> the moment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9627:
--
Labels: pull-request-available  (was: )

> Extending 'KafkaJsonTableSource' according to comments will result in NPE
> -
>
> Key: FLINK-9627
> URL: https://issues.apache.org/jira/browse/FLINK-9627
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> According to the comments what is needed to extend the 'KafkaJsonTableSource' 
> looks as follows:
>  
> {code:java}
> A version-agnostic Kafka JSON {@link StreamTableSource}.
> *
> * The version-specific Kafka consumers need to extend this class and
> * override {@link #createKafkaConsumer(String, Properties, 
> DeserializationSchema)}}.
> *
> * The field names are used to parse the JSON file and so are the 
> types.{code}
> This will cause an NPE, since there is no default value for startupMode in 
> the abstract class itself only in the builder of this class. 
> For the 'getKafkaConsumer' method the switch statement will be executed on 
> non-initialized 'startupMode' field:
> {code:java}
> switch (startupMode) {
> case EARLIEST:
> kafkaConsumer.setStartFromEarliest();
> break;
> case LATEST:
> kafkaConsumer.setStartFromLatest();
> break;
> case GROUP_OFFSETS:
> kafkaConsumer.setStartFromGroupOffsets();
> break;
> case SPECIFIC_OFFSETS:
> kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
> break;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519453#comment-16519453
 ] 

ASF GitHub Bot commented on FLINK-9627:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6198

[FLINK-9627] Extending KafkaJsonTableSource according to comments will 
result in NPE

## What is the purpose of the change

* This pull request fixed a NPE when customize KafkaJsonTableSource without 
Builder*


## Brief change log

  - *Initialize the `StartupMode ` field with a enum value*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9627

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6198


commit 1dfe2b01921d437485523418c44739ee91febaa5
Author: yanghua 
Date:   2018-06-21T08:46:53Z

[FLINK-9627] Extending KafkaJsonTableSource according to comments will 
result in NPE




> Extending 'KafkaJsonTableSource' according to comments will result in NPE
> -
>
> Key: FLINK-9627
> URL: https://issues.apache.org/jira/browse/FLINK-9627
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> According to the comments what is needed to extend the 'KafkaJsonTableSource' 
> looks as follows:
>  
> {code:java}
> A version-agnostic Kafka JSON {@link StreamTableSource}.
> *
> * The version-specific Kafka consumers need to extend this class and
> * override {@link #createKafkaConsumer(String, Properties, 
> DeserializationSchema)}}.
> *
> * The field names are used to parse the JSON file and so are the 
> types.{code}
> This will cause an NPE, since there is no default value for startupMode in 
> the abstract class itself only in the builder of this class. 
> For the 'getKafkaConsumer' method the switch statement will be executed on 
> non-initialized 'startupMode' field:
> {code:java}
> switch (startupMode) {
> case EARLIEST:
> kafkaConsumer.setStartFromEarliest();
> break;
> case LATEST:
> kafkaConsumer.setStartFromLatest();
> break;
> case GROUP_OFFSETS:
> kafkaConsumer.setStartFromGroupOffsets();
> break;
> case SPECIFIC_OFFSETS:
> kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
> break;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6198: [FLINK-9627] Extending KafkaJsonTableSource accord...

2018-06-21 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6198

[FLINK-9627] Extending KafkaJsonTableSource according to comments will 
result in NPE

## What is the purpose of the change

* This pull request fixed a NPE when customize KafkaJsonTableSource without 
Builder*


## Brief change log

  - *Initialize the `StartupMode ` field with a enum value*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9627

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6198.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6198


commit 1dfe2b01921d437485523418c44739ee91febaa5
Author: yanghua 
Date:   2018-06-21T08:46:53Z

[FLINK-9627] Extending KafkaJsonTableSource according to comments will 
result in NPE




---


[jira] [Comment Edited] (FLINK-9411) Support parquet rolling sink writer

2018-06-21 Thread Triones Deng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405
 ] 

Triones Deng edited comment on FLINK-9411 at 6/21/18 2:29 PM:
--

[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
which one is better?


was (Author: triones):
[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
what do you think will be better?

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: mingleizhang
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519412#comment-16519412
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
Thanks @fmthoma, will proceed to merge this ..


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
Thanks @fmthoma, will proceed to merge this ..


---


[jira] [Commented] (FLINK-9411) Support parquet rolling sink writer

2018-06-21 Thread Triones Deng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519405#comment-16519405
 ] 

Triones Deng commented on FLINK-9411:
-

[~StephanEwen] sure a design is necessary, here there may be two ways to do it 
i think
in production. Orc and Parquet file is popupar because of columnar storage.
there are two method to support parquet writer.
1. just write a ParquetStreamWriter which is a subclass of StreamWriterBase. 
which looks the same as FLINK-9407
2. with a HdfsWriterWrapper which own one delegate writer, when end user want 
to use one format, just simply to specify the format like orc, parquet and let 
the wrapper create a suitable writer like OrcStreamWriter or 
ParquetStreamWriter and so on.
sample code for the HdfsWriterWrapper

{code:java}
public class HdfsWriterWrapper implements Writer {

private Writer delegate;
private String format;
private Configuration configuration;
private TableSchema tableSchema;

public HdfsWriterWrapper(Configuration configuration, String 
format,Class tableClass,String[] columnFields){

}
{code}
what do you think will be better?

> Support parquet rolling sink writer
> ---
>
> Key: FLINK-9411
> URL: https://issues.apache.org/jira/browse/FLINK-9411
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: mingleizhang
>Assignee: Triones Deng
>Priority: Major
>
> Like support orc rolling sink writer in FLINK-9407 , we should also support 
> parquet rolling sink writer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6197: [FLINK-9638][E2E Tests] Add helper script to run single t...

2018-06-21 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6197
  
Tried this locally, and it works.
+1, will merge this ..


---


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519402#comment-16519402
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6021
  
@tzulitai Thanks for your last review comments! I addressed them, and 
rebased the branch against master.


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-06-21 Thread fmthoma
Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6021
  
@tzulitai Thanks for your last review comments! I addressed them, and 
rebased the branch against master.


---


[jira] [Commented] (FLINK-9638) Add helper script to run single e2e test

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519399#comment-16519399
 ] 

ASF GitHub Bot commented on FLINK-9638:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6197
  
Tried this locally, and it works.
+1, will merge this ..


> Add helper script to run single e2e test
> 
>
> Key: FLINK-9638
> URL: https://issues.apache.org/jira/browse/FLINK-9638
> Project: Flink
>  Issue Type: Improvement
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519396#comment-16519396
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143312
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+public class TimeoutLatch {
--- End diff --

✔


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519395#comment-16519395
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143254
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
+   int attempt = 0;
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   backpressureCycles.inc();
+   if (attempt >= 10) {
+   LOG.warn("Waiting for the queue length to drop 
below the limit takes unusually long, still not done after {} attempts.", 
attempt);
+   }
+   attempt++;
+   try {
+   backpressureLatch.await(100);
--- End diff --

It does, but if we make it configurable, I'd rather keep the warning 
threshold at one second, i.e. `if (attempt >= 1000 / waitTime) { … }`.


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519397#comment-16519397
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143428
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

✔


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread fmthoma
Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143428
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

✔


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread fmthoma
Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143312
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+public class TimeoutLatch {
--- End diff --

✔


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread fmthoma
Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197143254
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
+   int attempt = 0;
+   while (producer.getOutstandingRecordsCount() >= queueLimit) {
+   backpressureCycles.inc();
+   if (attempt >= 10) {
+   LOG.warn("Waiting for the queue length to drop 
below the limit takes unusually long, still not done after {} attempts.", 
attempt);
+   }
+   attempt++;
+   try {
+   backpressureLatch.await(100);
--- End diff --

It does, but if we make it configurable, I'd rather keep the warning 
threshold at one second, i.e. `if (attempt >= 1000 / waitTime) { … }`.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519393#comment-16519393
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197142993
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

As far as i can tell we are using the encoder correctly, but the decoder 
usage wasn't written against the encoder, but (i guess) only against `curl` or 
the web UI that never send an empty LAST_HTTP_CONTENT, but a 
`DefaultLastHttpContent` instead.

Interestingly enough, if you use anything but the netty encoder, without an 
`instanceof LastHttpContent` check it isn't possible to know whether the 
decoder is done or not.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197142993
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

As far as i can tell we are using the encoder correctly, but the decoder 
usage wasn't written against the encoder, but (i guess) only against `curl` or 
the web UI that never send an empty LAST_HTTP_CONTENT, but a 
`DefaultLastHttpContent` instead.

Interestingly enough, if you use anything but the netty encoder, without an 
`instanceof LastHttpContent` check it isn't possible to know whether the 
decoder is done or not.


---


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519390#comment-16519390
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197142648
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) {
this.failOnError = failOnError;
}
 
+   /**
+* The {@link KinesisProducer} holds an unbounded queue internally. To 
avoid memory
+* problems under high loads, a limit can be employed above which the 
internal queue
+* will be flushed, thereby applying backpressure.
+*
+* @param queueLimit The maximum length of the internal queue before 
backpressuring
+*/
+   public void setQueueLimit(int queueLimit) {
+   this.queueLimit = queueLimit;
--- End diff --

✔ (`queueLimit > 0`)


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread fmthoma
Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197142648
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -144,6 +163,17 @@ public void setFailOnError(boolean failOnError) {
this.failOnError = failOnError;
}
 
+   /**
+* The {@link KinesisProducer} holds an unbounded queue internally. To 
avoid memory
+* problems under high loads, a limit can be employed above which the 
internal queue
+* will be flushed, thereby applying backpressure.
+*
+* @param queueLimit The maximum length of the internal queue before 
backpressuring
+*/
+   public void setQueueLimit(int queueLimit) {
+   this.queueLimit = queueLimit;
--- End diff --

✔ (`queueLimit > 0`)


---


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-21 Thread fmthoma
Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197141591
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -55,6 +58,13 @@
 @PublicEvolving
 public class FlinkKinesisProducer extends RichSinkFunction 
implements CheckpointedFunction {
 
+   public static final String KINESIS_PRODUCER_METRIC_GROUP = 
"kinesisProducer";
+
+   public static final String METRIC_BACKPRESSURE_CYCLES = 
"backpressureCycles";
+
+   public static final String METRIC_OUTSTANDING_RECORDS_COUNT = 
"outstandingRecordsCount";
+
+
--- End diff --

✔


---


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519386#comment-16519386
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r197141591
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -55,6 +58,13 @@
 @PublicEvolving
 public class FlinkKinesisProducer extends RichSinkFunction 
implements CheckpointedFunction {
 
+   public static final String KINESIS_PRODUCER_METRIC_GROUP = 
"kinesisProducer";
+
+   public static final String METRIC_BACKPRESSURE_CYCLES = 
"backpressureCycles";
+
+   public static final String METRIC_OUTSTANDING_RECORDS_COUNT = 
"outstandingRecordsCount";
+
+
--- End diff --

✔


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9638) Add helper script to run single e2e test

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519381#comment-16519381
 ] 

ASF GitHub Bot commented on FLINK-9638:
---

GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/6197

[FLINK-9638][E2E Tests] Add helper script to run single test

## What is the purpose of the change
Usage: ./run-single-test.sh  [  ...]
This PR adds a helper script `run-single-test.sh` that allows you to run
a single test in the context of the text runner.

This provides you with
* Setup of ENV variables
* cleanup after test
* Nicer output

Usage: ./run-single-test.sh  [  ...]
## Verifying this change

Ran it by hand with different test scripts

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency):no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? no yet, will add this to 
documentation PR that is currently open as well


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
FLINK-9638-add-helper-script-for-single-e2e-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6197.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6197


commit f1e35a094b2e709557efec2c113711497dd45323
Author: Florian Schmidt 
Date:   2018-06-21T13:42:08Z

[FLINK-9638][E2E Tests] Add helper script to run single test

This commit adds a helper script `run-single-test.sh` that allows you to run
a single test in the context of the text runner.

This provides you with
* Setup of ENV variables
* cleanup after test
* Nicer output

Usage: ./run-single-test.sh  [  ...]




> Add helper script to run single e2e test
> 
>
> Key: FLINK-9638
> URL: https://issues.apache.org/jira/browse/FLINK-9638
> Project: Flink
>  Issue Type: Improvement
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9638) Add helper script to run single e2e test

2018-06-21 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9638:
--
Labels: pull-request-available  (was: )

> Add helper script to run single e2e test
> 
>
> Key: FLINK-9638
> URL: https://issues.apache.org/jira/browse/FLINK-9638
> Project: Flink
>  Issue Type: Improvement
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519380#comment-16519380
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197138450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

But we are using the `HttpPostRequestEncoder`, right? So do we use it maybe 
wrongly?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >