[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518905#comment-16518905
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197008798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

I would suggest to use try-with-resource to make sure to close `in`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the 

[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197008798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

I would suggest to use try-with-resource to make sure to close `in`.


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518892#comment-16518892
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
--- End diff --

In this block, we need to iterate the `ttlValue` twice, one for `collect()` 
and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as 
the argument, then we can avoiding the `collect()` here, this way we only need 
to iterate the `ttlValue` once.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518896#comment-16518896
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196998472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we've called `getInterval()` in `get()`, and make the `updateTs()` to 
accept `Iterable`, then this method seems could be removed(Or at least, we 
should add a check for if the `iterable` is assignable from `List`, if true we 
could cast it to List and return immediately).


> Create wrapper with TTL logic for value state
> 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518893#comment-16518893
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user value of state with TTL
+ */
+class TtlReducingState
+   extends AbstractTtlState, InternalReducingState>>
+   implements InternalReducingState {
+   TtlReducingState(
+   InternalReducingState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public T get() throws Exception {
+   return getInternal();
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   original.add(wrapWithTs(value, Long.MAX_VALUE));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, Should we also do the TTL check for original.mergeNamespaces()? 
Since we need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518898#comment-16518898
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
+   aggregateFunction.updater = originalState::updateInternal;
+   }
+
+   @Override
+   public OUT get() throws Exception {
+   return original.get();
+   }
+
+   @Override
+   public void add(IN value) throws Exception {
+   original.add(value);
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public ACC getInternal() throws Exception {
+   return getWithTtlCheckAndUpdate(original::getInternal, 
original::updateInternal);
+   }
+
+   @Override
+   public void updateInternal(ACC valueToStore) throws Exception {
+   original.updateInternal(wrapWithTs(valueToStore));
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Should we also do the TTL check for original.mergeNamespaces()? Since we 
need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518895#comment-16518895
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
--- End diff --

This looks a bit weird, my gut feeling is that we should call 
`getInternal()` in `get()`(as we called `updateInternal()` in `update()` in 
this class), but here is reverse.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518894#comment-16518894
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, should we also do the `TTL` check for `original.mergeNamespaces()`? 
Since we need to query the state when merging namespaces.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518897#comment-16518897
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197004891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

The `expirationTimestamp` is an absolute timestamp, should we do the 
timestamp shift for `TtlValue` when checkpoint & recovery? For example, when 
user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. 
For some reason, he triggers a savepoint, and after 11 min he recover the job 
from the savepoint, if we don't do the timestamp shift, then all the state will 
be expired.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001110
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalReducingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps reducing state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user value of state with TTL
+ */
+class TtlReducingState
+   extends AbstractTtlState, InternalReducingState>>
+   implements InternalReducingState {
+   TtlReducingState(
+   InternalReducingState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public T get() throws Exception {
+   return getInternal();
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   original.add(wrapWithTs(value, Long.MAX_VALUE));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, Should we also do the TTL check for original.mergeNamespaces()? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Again, should we also do the `TTL` check for `original.mergeNamespaces()`? 
Since we need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196998472
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
--- End diff --

If we've called `getInterval()` in `get()`, and make the `updateTs()` to 
accept `Iterable`, then this method seems could be removed(Or at least, we 
should add a check for if the `iterable` is assignable from `List`, if true we 
could cast it to List and return immediately).


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197004891
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * This class wraps user value of state with TTL.
+ *
+ * @param  Type of the user value of state with TTL
+ */
+class TtlValue implements Serializable {
+   private final T userValue;
+   private final long expirationTimestamp;
--- End diff --

The `expirationTimestamp` is an absolute timestamp, should we do the 
timestamp shift for `TtlValue` when checkpoint & recovery? For example, when 
user using the `ProcessTime` as the TimeCharacater, and set the `TTL = 10min`. 
For some reason, he triggers a savepoint, and after 11 min he recover the job 
from the savepoint, if we don't do the timestamp shift, then all the state will 
be expired.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r197001339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlAggregatingState.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalAggregatingState;
+
+import java.util.Collection;
+
+/**
+ * This class wraps aggregating state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the value added to the state
+ * @param  The type of the accumulator (intermediate aggregate state).
+ * @param  Type of the value extracted from the state
+ *
+ */
+class TtlAggregatingState
+   extends AbstractTtlState, 
InternalAggregatingState, OUT>>
+   implements InternalAggregatingState {
+
+   TtlAggregatingState(
+   InternalAggregatingState, OUT> 
originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer valueSerializer,
+   TtlAggregateFunction aggregateFunction) {
+   super(originalState, config, timeProvider, valueSerializer);
+   aggregateFunction.stateClear = originalState::clear;
+   aggregateFunction.updater = originalState::updateInternal;
+   }
+
+   @Override
+   public OUT get() throws Exception {
+   return original.get();
+   }
+
+   @Override
+   public void add(IN value) throws Exception {
+   original.add(value);
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public ACC getInternal() throws Exception {
+   return getWithTtlCheckAndUpdate(original::getInternal, 
original::updateInternal);
+   }
+
+   @Override
+   public void updateInternal(ACC valueToStore) throws Exception {
+   original.updateInternal(wrapWithTs(valueToStore));
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
--- End diff --

Should we also do the TTL check for original.mergeNamespaces()? Since we 
need to query the state when merging namespaces.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996094
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
--- End diff --

This looks a bit weird, my gut feeling is that we should call 
`getInternal()` in `get()`(as we called `updateInternal()` in `update()` in 
this class), but here is reverse.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196996839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
--- End diff --

In this block, we need to iterate the `ttlValue` twice, one for `collect()` 
and one for `updateTs()`. If we could make the updateTs to accept `Iterable` as 
the argument, then we can avoiding the `collect()` here, this way we only need 
to iterate the `ttlValue` once.


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518889#comment-16518889
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r197005582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type.
+  */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+  // TODO not sure if we should the BasicTypeInfo here
+  var typeInformation: BasicTypeInfo[T] = _
+  var value: T = _
+
+  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --

Yes, the `TypeStringUtils` can extract every valid type information and the 
user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just 
wonder how the basic types supported by Java can be properly inferred from the 
config file (e.g., how to decide a parameter `1` is a byte, a short or an int). 
Although a short value or a byte value can be represented with an int, that 
will affect the constructor searching via Java reflection. Do you have any 
ideas for that?


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r197005582
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeDescriptor.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.typeutils.TypeStringUtils
+
+/**
+  * Descriptor for a primitive type.
+  */
+class PrimitiveTypeDescriptor[T] extends HierarchyDescriptor {
+
+  // TODO not sure if we should the BasicTypeInfo here
+  var typeInformation: BasicTypeInfo[T] = _
+  var value: T = _
+
+  def setType(basicType: BasicTypeInfo[T]): PrimitiveTypeDescriptor[T] = {
--- End diff --

Yes, the `TypeStringUtils` can extract every valid type information and the 
user can specify the type via the API (e.g., `setType(Types.SHORT)`). I just 
wonder how the basic types supported by Java can be properly inferred from the 
config file (e.g., how to decide a parameter `1` is a byte, a short or an int). 
Although a short value or a byte value can be represented with an int, that 
will affect the constructor searching via Java reflection. Do you have any 
ideas for that?


---


[jira] [Created] (FLINK-9632) SlotPool should notify the call when allocateSlot meet an exception

2018-06-20 Thread shuai.xu (JIRA)
shuai.xu created FLINK-9632:
---

 Summary: SlotPool should notify the call when allocateSlot meet an 
exception
 Key: FLINK-9632
 URL: https://issues.apache.org/jira/browse/FLINK-9632
 Project: Flink
  Issue Type: Bug
  Components: Cluster Management
Reporter: shuai.xu
Assignee: shuai.xu


In SlotPool, the allocateSlot() will return a CompletableFuture, 
but this future will only be completed when slotAndLocalityFuture return a 
LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9632) SlotPool should notify the caller when allocateSlot meet an exception

2018-06-20 Thread shuai.xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-9632:

Summary: SlotPool should notify the caller when allocateSlot meet an 
exception  (was: SlotPool should notify the call when allocateSlot meet an 
exception)

> SlotPool should notify the caller when allocateSlot meet an exception
> -
>
> Key: FLINK-9632
> URL: https://issues.apache.org/jira/browse/FLINK-9632
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> In SlotPool, the allocateSlot() will return a CompletableFuture, 
> but this future will only be completed when slotAndLocalityFuture return a 
> LogicSlot, if slotAndLocalityFuture is completed exceptionally, it will never 
> be completed. so the caller will never know it. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9631) use Files.createDirectories instead of directory.mkdirs

2018-06-20 Thread makeyang (JIRA)
makeyang created FLINK-9631:
---

 Summary: use Files.createDirectories instead of directory.mkdirs
 Key: FLINK-9631
 URL: https://issues.apache.org/jira/browse/FLINK-9631
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 1.4.2, 1.5.0
 Environment: flink1.4

jdk1.8 latest

linux 2.6
Reporter: makeyang
Assignee: makeyang


job can't be run due to below exception:

{color:#6a8759}Could not create RocksDB data directory{color}

 but with this exception, I can't tell exactly why.

so I suggest Files.createDirectories which throw exception be used rather than 
File.mkdirs

 

I have some more suggestions:
 # should we use Files.createDirectories to relpace File.mkdirs?
 # each time task manager throw exception to jobmanager, should IP+nodeId be 
contained in exception, which means we should define more flink exception which 
is used to wrap other exceptions such as jdk exceptions?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518854#comment-16518854
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196998269
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[PrimitiveTypeDescriptor]].
+  */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+properties
+  .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", 
isOptional = false)
+properties
+  
.validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", 
isOptional = false, 1)
+  }
+}
+
+object PrimitiveTypeValidator {
+  val PRIMITIVE_TYPE = "type"
+  val PRIMITIVE_VALUE = "value"
+
+  def derivePrimitiveValue(keyPrefix: String, properties: 
DescriptorProperties): Any = {
+val typeInfo =
+  properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+val value = typeInfo match {
+  case basicType: BasicTypeInfo[_] =>
+basicType match {
+  case BasicTypeInfo.INT_TYPE_INFO =>
+properties.getInt(valueKey)
+  case BasicTypeInfo.LONG_TYPE_INFO =>
+properties.getLong(valueKey)
+  case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+properties.getDouble(valueKey)
+  case BasicTypeInfo.STRING_TYPE_INFO =>
+properties.getString(valueKey)
+  case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+properties.getBoolean(valueKey)
+  //TODO add more types
--- End diff --

Ah, yes. The array type was not considered. I'll think about that and add 
the support if it's not hard to implement. Otherwise, we could arrange it to a 
follow-up issue.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by 

[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196998269
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/PrimitiveTypeValidator.scala
 ---
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.table.api.TableException
+import 
org.apache.flink.table.descriptors.PrimitiveTypeValidator.PRIMITIVE_VALUE
+import scala.collection.JavaConversions._
+
+/**
+  * Validator for [[PrimitiveTypeDescriptor]].
+  */
+class PrimitiveTypeValidator extends HierarchyDescriptorValidator {
+  override def validateWithPrefix(keyPrefix: String, properties: 
DescriptorProperties): Unit = {
+properties
+  .validateType(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_TYPE}", 
isOptional = false)
+properties
+  
.validateString(s"$keyPrefix${PrimitiveTypeValidator.PRIMITIVE_VALUE}", 
isOptional = false, 1)
+  }
+}
+
+object PrimitiveTypeValidator {
+  val PRIMITIVE_TYPE = "type"
+  val PRIMITIVE_VALUE = "value"
+
+  def derivePrimitiveValue(keyPrefix: String, properties: 
DescriptorProperties): Any = {
+val typeInfo =
+  properties.getType(s"$keyPrefix$PRIMITIVE_TYPE")
+val valueKey = s"$keyPrefix$PRIMITIVE_VALUE"
+val value = typeInfo match {
+  case basicType: BasicTypeInfo[_] =>
+basicType match {
+  case BasicTypeInfo.INT_TYPE_INFO =>
+properties.getInt(valueKey)
+  case BasicTypeInfo.LONG_TYPE_INFO =>
+properties.getLong(valueKey)
+  case BasicTypeInfo.DOUBLE_TYPE_INFO =>
+properties.getDouble(valueKey)
+  case BasicTypeInfo.STRING_TYPE_INFO =>
+properties.getString(valueKey)
+  case BasicTypeInfo.BOOLEAN_TYPE_INFO =>
+properties.getBoolean(valueKey)
+  //TODO add more types
--- End diff --

Ah, yes. The array type was not considered. I'll think about that and add 
the support if it's not hard to implement. Otherwise, we could arrange it to a 
follow-up issue.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996876
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --

The testing functions provided in `flink-table` are mainly considered for 
the functional test. While the functions added there are mainly considered for 
the constructional test, i.e., I made the UDF constructors complex or event 
sort of unreasonable...


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518839#comment-16518839
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996876
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.utils;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.Types;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+
+/**
+ * A bunch of UDFs for SQL-Client test.
+ */
+public class UserDefinedFunctions {
--- End diff --

The testing functions provided in `flink-table` are mainly considered for 
the functional test. While the functions added there are mainly considered for 
the constructional test, i.e., I made the UDF constructors complex or event 
sort of unreasonable...


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518822#comment-16518822
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996002
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 ---
@@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
assertEquals(expectedTableSchema, actualTableSchema);
}
 
+   @Test(timeout = 30_000L)
+   public void testScalarUDF() throws Exception {
--- End diff --

I'll add more test cases for that.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518820#comment-16518820
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995942
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -187,6 +206,7 @@ private static ClusterSpecification 
createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
 
+   @SuppressWarnings("unchecked")
--- End diff --

The `AggregateFunction` and `TableFunction` take generic type parameters. 
Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, 
(AggregateFunction) udf);`) the Java compiler complains about that.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196996002
  
--- Diff: 
flink-libraries/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 ---
@@ -145,6 +146,68 @@ public void testTableSchema() throws Exception {
assertEquals(expectedTableSchema, actualTableSchema);
}
 
+   @Test(timeout = 30_000L)
+   public void testScalarUDF() throws Exception {
--- End diff --

I'll add more test cases for that.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995942
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -187,6 +206,7 @@ private static ClusterSpecification 
createClusterSpecification(CustomCommandLine
private final StreamExecutionEnvironment streamExecEnv;
private final TableEnvironment tableEnv;
 
+   @SuppressWarnings("unchecked")
--- End diff --

The `AggregateFunction` and `TableFunction` take generic type parameters. 
Thus when do conversions (e.g., `streamTableEnvironment.registerFunction(k, 
(AggregateFunction) udf);`) the Java compiler complains about that.


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518813#comment-16518813
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995241
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -101,6 +110,16 @@ public ExecutionContext(Environment 
defaultEnvironment, SessionContext sessionCo
}
});
 
+   // generate user-defined functions
+   functions = new HashMap<>();
+   mergedEnv.getFunctions().forEach((name, descriptor) -> {
+   DescriptorProperties properties = new 
DescriptorProperties(true);
+   descriptor.addProperties(properties);
+   functions.put(
+   name,
+   
FunctionValidator.generateUserDefinedFunction(properties, classLoader));
--- End diff --

That's a good idea.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518812#comment-16518812
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995191
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --

I'm not sure if there will be more descriptors (for other purposes) extend 
`FunctionDescriptor` in the future. How about renaming it to 
`UserDefinedFunction`?


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995241
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 ---
@@ -101,6 +110,16 @@ public ExecutionContext(Environment 
defaultEnvironment, SessionContext sessionCo
}
});
 
+   // generate user-defined functions
+   functions = new HashMap<>();
+   mergedEnv.getFunctions().forEach((name, descriptor) -> {
+   DescriptorProperties properties = new 
DescriptorProperties(true);
+   descriptor.addProperties(properties);
+   functions.put(
+   name,
+   
FunctionValidator.generateUserDefinedFunction(properties, classLoader));
--- End diff --

That's a good idea.


---


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196995191
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
--- End diff --

I'm not sure if there will be more descriptors (for other purposes) extend 
`FunctionDescriptor` in the future. How about renaming it to 
`UserDefinedFunction`?


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518810#comment-16518810
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Maybe we should also check that the `ttl` is greater than 0?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9514:
--
Labels: pull-request-available  (was: )

> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196995095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Maybe we should also check that the `ttl` is greater than 0?


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518808#comment-16518808
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196994767
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+   private static final String FROM = "from";
+
+   private From from;
+
+   private UDFDescriptor(String name, From from) {
+   super(name);
+   this.from = from;
+   }
+
+   public From getFrom() {
+   return from;
+   }
+
+   /**
+* Create a UDF descriptor with the given config.
+*/
+   public static UDFDescriptor create(Map config) {
+   if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+   throw new SqlClientException("The 'name' attribute of a 
function is missing.");
+   }
+
+   final Object name = 
config.get(FunctionValidator.FUNCTION_NAME());
+   if (!(name instanceof String) || ((String) name).length() <= 0) 
{
+   throw new SqlClientException("Invalid function name '" 
+ name + "'.");
+   }
--- End diff --

At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the 
type information for some parameters was lost in this process, e.g., we could 
not tell a `false` is a boolean or a string.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
>  

[jira] [Updated] (FLINK-8863) Add user-defined function support in SQL Client

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8863:
--
Labels: pull-request-available  (was: )

> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6090: [FLINK-8863] [SQL] Add user-defined function suppo...

2018-06-20 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/6090#discussion_r196994767
  
--- Diff: 
flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/config/UDFDescriptor.java
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.descriptors.ClassTypeDescriptor;
+import org.apache.flink.table.descriptors.ClassTypeValidator;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.FunctionDescriptor;
+import org.apache.flink.table.descriptors.FunctionValidator;
+import org.apache.flink.table.descriptors.PrimitiveTypeDescriptor;
+import org.apache.flink.table.descriptors.PrimitiveTypeValidator;
+import org.apache.flink.table.typeutils.TypeStringUtils;
+
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.client.config.UDFDescriptor.From.CLASS;
+
+/**
+ * Descriptor for user-defined functions.
+ */
+public class UDFDescriptor extends FunctionDescriptor {
+
+   private static final String FROM = "from";
+
+   private From from;
+
+   private UDFDescriptor(String name, From from) {
+   super(name);
+   this.from = from;
+   }
+
+   public From getFrom() {
+   return from;
+   }
+
+   /**
+* Create a UDF descriptor with the given config.
+*/
+   public static UDFDescriptor create(Map config) {
+   if (!config.containsKey(FunctionValidator.FUNCTION_NAME())) {
+   throw new SqlClientException("The 'name' attribute of a 
function is missing.");
+   }
+
+   final Object name = 
config.get(FunctionValidator.FUNCTION_NAME());
+   if (!(name instanceof String) || ((String) name).length() <= 0) 
{
+   throw new SqlClientException("Invalid function name '" 
+ name + "'.");
+   }
--- End diff --

At first, I did use `ConfigUtil.normalizeYaml` for that. But I found the 
type information for some parameters was lost in this process, e.g., we could 
not tell a `false` is a boolean or a string.


---


[jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-20 Thread Youjun Yuan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Youjun Yuan updated FLINK-9630:
---
Description: 
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
     // set ourselves as not running;
     // this would let the main discovery loop escape as soon as possible
     running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null)

{             // we cannot close the discoverer here, as it is error-prone to 
concurrent access;             // only wakeup the discoverer, the discovery 
loop will clean itself up after it escapes             
partitionDiscoverer.wakeup();         }

    // the discovery loop may currently be sleeping in-between
     // consecutive discoveries; interrupt to shutdown faster
     discoveryLoopThread.interrupt();
     }

    // abort the fetcher, if there is one
     if (kafkaFetcher != null)

{          kafkaFetcher.cancel();     }

}

 

 

  was:
when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
    // set ourselves as not running;
    // this would let the main discovery loop escape as soon as possible
    running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null) {
            // we cannot close the discoverer here, as it is error-prone to 
concurrent access;
            // only wakeup the discoverer, the discovery loop will clean itself 
up after it escapes
            partitionDiscoverer.wakeup();
        }

    // the discovery loop may currently be sleeping in-between
    // consecutive discoveries; interrupt to shutdown faster
    discoveryLoopThread.interrupt();
    }

    // abort the fetcher, if there is one
    if (kafkaFetcher != null) {
         kafkaFetcher.cancel();
    }
}


I tried to fix it by catching *TopicAuthorizationException* in ** 
Kafka09PartitionDiscoverer.getAllPartitionsForTopics(), and close the 
kafkaConsumer. Which has been verified working.

So I'd like to take this issue.

> Kafka09PartitionDiscoverer cause connection leak on 
> TopicAuthorizationException
> ---

[jira] [Created] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException

2018-06-20 Thread Youjun Yuan (JIRA)
Youjun Yuan created FLINK-9630:
--

 Summary: Kafka09PartitionDiscoverer cause connection leak on 
TopicAuthorizationException
 Key: FLINK-9630
 URL: https://issues.apache.org/jira/browse/FLINK-9630
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2, 1.5.0
 Environment: Linux 2.6, java 8, Kafka broker 0.10.x
Reporter: Youjun Yuan
 Fix For: 1.5.1


when the Kafka topic got deleted, during task starting process, 
Kafka09PartitionDiscoverer will get a *TopicAuthorizationException* in 
getAllPartitionsForTopics(), and it get no chance to close the  kafkaConsumer, 
hence resulting TCP connection leak (to Kafka broker).

 

*this issue can bring down the whole Flink cluster*, because, in a default 
setup (fixedDelay with INT.MAX restart attempt), job manager will randomly 
schedule the job to any TaskManager that as free slot, and each attemp will 
cause the TaskManager to leak a TCP connection, eventually almost every 
TaskManager will run out of file handle, hence no taskmanger could make 
snaptshot, or accept new job. Effectly stops the whole cluster.

 

The leak happens when StreamTask.invoke() calls openAllOperators(), then 
FlinkKafkaConsumerBase.open() calls partitionDiscoverer.discoverPartitions(), 
when kafkaConsumer.partitionsFor(topic) in 
KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a 
*TopicAuthorizationException,* no one catches this.

Though StreamTask.open catches Exception and invoks the dispose() method of 
each operator, which eventaully invoke FlinkKakfaConsumerBase.cancel(), however 
it does not close the kakfaConsumer in partitionDiscoverer, not event invoke 
the partitionDiscoverer.wakeup(), because the discoveryLoopThread was null.

 

below the code of FlinkKakfaConsumerBase.cancel() for your convenience

public void cancel() {
    // set ourselves as not running;
    // this would let the main discovery loop escape as soon as possible
    running = false;

    if (discoveryLoopThread != null) {

        if (partitionDiscoverer != null) {
            // we cannot close the discoverer here, as it is error-prone to 
concurrent access;
            // only wakeup the discoverer, the discovery loop will clean itself 
up after it escapes
            partitionDiscoverer.wakeup();
        }

    // the discovery loop may currently be sleeping in-between
    // consecutive discoveries; interrupt to shutdown faster
    discoveryLoopThread.interrupt();
    }

    // abort the fetcher, if there is one
    if (kafkaFetcher != null) {
         kafkaFetcher.cancel();
    }
}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9629:
--
Labels: pull-request-available  (was: )

> Datadog metrics reporter does not have shaded dependencies
> --
>
> Key: FLINK-9629
> URL: https://issues.apache.org/jira/browse/FLINK-9629
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.5.1
>Reporter: Georgii Gobozov
>Priority: Major
>  Labels: pull-request-available
>
> flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for 
> okhttp3 and okio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518747#comment-16518747
 ] 

ASF GitHub Bot commented on FLINK-9629:
---

GitHub user gobozov opened a pull request:

https://github.com/apache/flink/pull/6191

[FLINK-9629][metrics] Included datadog shaded dependencies for okhttp3 and 
okio



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gobozov/flink release-1.5

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6191


commit b0904ffc31d4a9272bf852766f15210387504420
Author: Georgii.Gobozov 
Date:   2018-06-21T00:17:52Z

FLINK-9629: included shaded dependencies for okhttp3 and okio




> Datadog metrics reporter does not have shaded dependencies
> --
>
> Key: FLINK-9629
> URL: https://issues.apache.org/jira/browse/FLINK-9629
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.5.1
>Reporter: Georgii Gobozov
>Priority: Major
>  Labels: pull-request-available
>
> flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for 
> okhttp3 and okio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6191: [FLINK-9629][metrics] Included datadog shaded depe...

2018-06-20 Thread gobozov
GitHub user gobozov opened a pull request:

https://github.com/apache/flink/pull/6191

[FLINK-9629][metrics] Included datadog shaded dependencies for okhttp3 and 
okio



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gobozov/flink release-1.5

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6191.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6191


commit b0904ffc31d4a9272bf852766f15210387504420
Author: Georgii.Gobozov 
Date:   2018-06-21T00:17:52Z

FLINK-9629: included shaded dependencies for okhttp3 and okio




---


[jira] [Created] (FLINK-9629) Datadog metrics reporter does not have shaded dependencies

2018-06-20 Thread Georgii Gobozov (JIRA)
Georgii Gobozov created FLINK-9629:
--

 Summary: Datadog metrics reporter does not have shaded dependencies
 Key: FLINK-9629
 URL: https://issues.apache.org/jira/browse/FLINK-9629
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.0, 1.6.0, 1.5.1
Reporter: Georgii Gobozov


flink-metrics-datadog-1.5.0.jar does not contain shaded dependencies for 
okhttp3 and okio



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9627) Extending 'KafkaJsonTableSource' according to comments will result in NPE

2018-06-20 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9627:
---

Assignee: vinoyang

> Extending 'KafkaJsonTableSource' according to comments will result in NPE
> -
>
> Key: FLINK-9627
> URL: https://issues.apache.org/jira/browse/FLINK-9627
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Dominik Wosiński
>Assignee: vinoyang
>Priority: Major
>
> According to the comments what is needed to extend the 'KafkaJsonTableSource' 
> looks as follows:
>  
> {code:java}
> A version-agnostic Kafka JSON {@link StreamTableSource}.
> *
> * The version-specific Kafka consumers need to extend this class and
> * override {@link #createKafkaConsumer(String, Properties, 
> DeserializationSchema)}}.
> *
> * The field names are used to parse the JSON file and so are the 
> types.{code}
> This will cause an NPE, since there is no default value for startupMode in 
> the abstract class itself only in the builder of this class. 
> For the 'getKafkaConsumer' method the switch statement will be executed on 
> non-initialized 'startupMode' field:
> {code:java}
> switch (startupMode) {
> case EARLIEST:
> kafkaConsumer.setStartFromEarliest();
> break;
> case LATEST:
> kafkaConsumer.setStartFromLatest();
> break;
> case GROUP_OFFSETS:
> kafkaConsumer.setStartFromGroupOffsets();
> break;
> case SPECIFIC_OFFSETS:
> kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
> break;
> }{code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6190: Add a simple pulsar source connector.

2018-06-20 Thread cckellogg
Github user cckellogg closed the pull request at:

https://github.com/apache/flink/pull/6190


---


[GitHub] flink pull request #6190: Add a simple pulsar source connector.

2018-06-20 Thread cckellogg
GitHub user cckellogg opened a pull request:

https://github.com/apache/flink/pull/6190

Add a simple pulsar source connector.

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cckellogg/flink flink-pulsar-source-connector

Alternatively you can review and apply these changes as the patch at:


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518631#comment-16518631
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user gliu6 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r196952063
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

Btw, I am not requesting any changes, pr looks good to me for it defined 
purpose.

Just wonder how to config easily. Now I think about this a bit more. Will 
it be better if we expose `queue size` to the user instead  of `queue limit 
(number)`, thus, Inside of the FKP class, define an integer recordSize, and 
inside of the invoke function, do a moving average calculation of the 
recordSize with `serialized.remaining()` dynamically. 


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-20 Thread gliu6
Github user gliu6 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r196952063
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

Btw, I am not requesting any changes, pr looks good to me for it defined 
purpose.

Just wonder how to config easily. Now I think about this a bit more. Will 
it be better if we expose `queue size` to the user instead  of `queue limit 
(number)`, thus, Inside of the FKP class, define an integer recordSize, and 
inside of the invoke function, do a moving average calculation of the 
recordSize with `serialized.remaining()` dynamically. 


---


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518590#comment-16518590
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user gliu6 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r196940135
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

I wonder whether we could adjust the queue limit dynamically. 
you mentioned that `queue limit = (number of shards * queue size per shard) 
/ record size`.
except record size, all others are relatively easy to set. For me, I don't 
really know the record size until the application starts. Also, what is the 
record size varies over time?
So how about add a queueLimit supplier function here to allow user to 
supply how the queueLimit is calculated dynamically? 



> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9374:
--
Labels: pull-request-available  (was: )

> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
>  Labels: pull-request-available
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-06-20 Thread gliu6
Github user gliu6 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6021#discussion_r196940135
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 ---
@@ -326,6 +366,29 @@ private void checkAndPropagateAsyncError() throws 
Exception {
}
}
 
+   /**
+* If the internal queue of the {@link KinesisProducer} gets too long,
+* flush some of the records until we are below the limit again.
+* We don't want to flush _all_ records at this point since that would
+* break record aggregation.
+*/
+   private void enforceQueueLimit() {
--- End diff --

I wonder whether we could adjust the queue limit dynamically. 
you mentioned that `queue limit = (number of shards * queue size per shard) 
/ record size`.
except record size, all others are relatively easy to set. For me, I don't 
really know the record size until the application starts. Also, what is the 
record size varies over time?
So how about add a queueLimit supplier function here to allow user to 
supply how the queueLimit is calculated dynamically? 



---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518360#comment-16518360
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196868266
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196868266
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, "localhost");

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518333#comment-16518333
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196849558
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196849558
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, "localhost");

[jira] [Commented] (FLINK-8795) Scala shell broken for Flip6

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518316#comment-16518316
 ] 

ASF GitHub Bot commented on FLINK-8795:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6182
  
Thanks for the fix @dawidwys!
The changes LGTM on my side, +1.


> Scala shell broken for Flip6
> 
>
> Key: FLINK-8795
> URL: https://issues.apache.org/jira/browse/FLINK-8795
> Project: Flink
>  Issue Type: Bug
>Reporter: kant kodali
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> I am trying to run the simple code below after building everything from 
> Flink's github master branch for various reasons. I get an exception below 
> and I wonder what runs on port 9065? and How to fix this exception?
> I followed the instructions from the Flink master branch so I did the 
> following.
> {code:java}
> git clone https://github.com/apache/flink.git 
> cd flink mvn clean package -DskipTests 
> cd build-target
>  ./bin/start-scala-shell.sh local{code}
> {{And Here is the code I ran}}
> {code:java}
> val dataStream = senv.fromElements(1, 2, 3, 4)
> dataStream.countWindowAll(2).sum(0).print()
> senv.execute("My streaming program"){code}
> {{And I finally get this exception}}
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph. at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306)
>  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  at java.lang.Thread.run(Thread.java:745) Caused by: 
> java.util.concurrent.CompletionException: java.net.ConnectException: 
> Connection refused: localhost/127.0.0.1:9065 at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ... 16 more Caused by: java.net.ConnectException: Connection refused: 
> localhost/127.0.0.1:9065 at sun.nio.ch.SocketChannelImpl.checkConnect(Native 
> Method) at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8795) Scala shell broken for Flip6

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8795:
--
Labels: pull-request-available  (was: )

> Scala shell broken for Flip6
> 
>
> Key: FLINK-8795
> URL: https://issues.apache.org/jira/browse/FLINK-8795
> Project: Flink
>  Issue Type: Bug
>Reporter: kant kodali
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> I am trying to run the simple code below after building everything from 
> Flink's github master branch for various reasons. I get an exception below 
> and I wonder what runs on port 9065? and How to fix this exception?
> I followed the instructions from the Flink master branch so I did the 
> following.
> {code:java}
> git clone https://github.com/apache/flink.git 
> cd flink mvn clean package -DskipTests 
> cd build-target
>  ./bin/start-scala-shell.sh local{code}
> {{And Here is the code I ran}}
> {code:java}
> val dataStream = senv.fromElements(1, 2, 3, 4)
> dataStream.countWindowAll(2).sum(0).print()
> senv.execute("My streaming program"){code}
> {{And I finally get this exception}}
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph. at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306)
>  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>  at java.lang.Thread.run(Thread.java:745) Caused by: 
> java.util.concurrent.CompletionException: java.net.ConnectException: 
> Connection refused: localhost/127.0.0.1:9065 at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) 
> at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>  ... 16 more Caused by: java.net.ConnectException: Connection refused: 
> localhost/127.0.0.1:9065 at sun.nio.ch.SocketChannelImpl.checkConnect(Native 
> Method) at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6182: [FLINK-8795] Fixed local scala shell for Flip6

2018-06-20 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6182
  
Thanks for the fix @dawidwys!
The changes LGTM on my side, +1.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518311#comment-16518311
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

we don't have to, it's for testing convenience as noted in the class 
javadocs.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

we don't have to, it's for testing convenience as noted in the class 
javadocs.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518310#comment-16518310
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836575
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+ 

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836575
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+   reset();
+   return;
+   }
+   }
+   }
+
+

[jira] [Updated] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9599:
--
Labels: pull-request-available  (was: )

> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518274#comment-16518274
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196827863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

Oh sorry, my bad, I'm misunderstand...


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196827863
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

Oh sorry, my bad, I'm misunderstand...


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518267#comment-16518267
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196826339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The `ttlValue` is changed before the lambda from return statement so it is 
not effectively immutable any more to be used in lambda, that is why 
`finalResult` is formally needed to avoid compilation error.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196826339
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The `ttlValue` is changed before the lambda from return statement so it is 
not effectively immutable any more to be used in lambda, that is why 
`finalResult` is formally needed to avoid compilation error.


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518244#comment-16518244
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196820474
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state 

[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196820474
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   long currentTs = timeProvider.currentTimestamp();
+   long ttl = config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518243#comment-16518243
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196809755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   return timeProvider.currentTimestamp() + 
config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from 

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518242#comment-16518242
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196816259
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valueToStore));

[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518241#comment-16518241
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196817846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The var `finalResult` looks like redundant or I'm misunderstand.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196809755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
+   }
+
+TtlValue wrapWithTs(V value) {
+   return wrapWithTs(value, newExpirationTimestamp());
+   }
+
+   static  TtlValue wrapWithTs(V value, long ts) {
+   return value == null ? null : new TtlValue<>(value, ts);
+   }
+
+TtlValue rewrapWithNewTs(TtlValue ttlValue) {
+   return wrapWithTs(ttlValue.getUserValue());
+   }
+
+   private long newExpirationTimestamp() {
+   return timeProvider.currentTimestamp() + 
config.getTtl().toMilliseconds();
--- End diff --

This will be called a lot often, so does it make sense to introduce a field 
to remember the `config.getTtl().toMilliseconds()`?


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196817846
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
--- End diff --

The var `finalResult` looks like redundant or I'm misunderstand.


---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196816259
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
 ---
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * This class wraps list state with TTL logic.
+ *
+ * @param  The type of key the state is associated to
+ * @param  The type of the namespace
+ * @param  Type of the user entry value of state with TTL
+ */
+class TtlListState extends
+   AbstractTtlState, List>, InternalListState>>
+   implements InternalListState {
+   TtlListState(
+   InternalListState> originalState,
+   TtlConfig config,
+   TtlTimeProvider timeProvider,
+   TypeSerializer> valueSerializer) {
+   super(originalState, config, timeProvider, valueSerializer);
+   }
+
+   @Override
+   public void update(List values) throws Exception {
+   updateInternal(values);
+   }
+
+   @Override
+   public void addAll(List values) throws Exception {
+   Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
+   original.addAll(withTs(values));
+   }
+
+   @Override
+   public Iterable get() throws Exception {
+   Iterable> ttlValue = original.get();
+   ttlValue = ttlValue == null ? Collections.emptyList() : 
ttlValue;
+   if (updateTsOnRead) {
+   List> collected = collect(ttlValue);
+   ttlValue = collected;
+   updateTs(collected);
+   }
+   final Iterable> finalResult = ttlValue;
+   return () -> new IteratorWithCleanup(finalResult.iterator());
+   }
+
+   private void updateTs(List> ttlValue) throws Exception {
+   List> unexpiredWithUpdatedTs = ttlValue.stream()
+   .filter(v -> !expired(v))
+   .map(this::rewrapWithNewTs)
+   .collect(Collectors.toList());
+   if (!unexpiredWithUpdatedTs.isEmpty()) {
+   original.update(unexpiredWithUpdatedTs);
+   }
+   }
+
+   @Override
+   public void add(T value) throws Exception {
+   Preconditions.checkNotNull(value, "You cannot add null to a 
ListState.");
+   original.add(wrapWithTs(value));
+   }
+
+   @Override
+   public void clear() {
+   original.clear();
+   }
+
+   @Override
+   public void mergeNamespaces(N target, Collection sources) throws 
Exception {
+   original.mergeNamespaces(target, sources);
+   }
+
+   @Override
+   public List getInternal() throws Exception {
+   return collect(get());
+   }
+
+   private  List collect(Iterable iterable) {
+   return StreamSupport.stream(iterable.spliterator(), 
false).collect(Collectors.toList());
+   }
+
+   @Override
+   public void updateInternal(List valueToStore) throws Exception {
+   Preconditions.checkNotNull(valueToStore, "List of values to 
update cannot be null.");
+   original.addAll(withTs(valueToStore));
--- End diff --

This seems to miss a `clear()`.


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518240#comment-16518240
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196819320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

In general Flink allows to operate in negative range for event time, but 
the overflow in case of very big TTL should be checked. TTL makes sense only 
non-negative. I have added fix for it.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread azagrebin
Github user azagrebin commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196819320
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

In general Flink allows to operate in negative range for event time, but 
the overflow in case of very big TTL should be checked. TTL makes sense only 
non-negative. I have added fix for it.


---


[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-06-20 Thread Prem Santosh (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prem Santosh updated FLINK-9598:

Description: 
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

[Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. 
(Sorry about the imgur link, I was not able to attach a screenshot on Jira 
because of some error)

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 

  was:
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

[#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not 
able to attach a screenshot on Jira because of some error)

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 


> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this 
> issue. (Sorry about the imgur link, I was not able to attach a screenshot on 
> Jira because of some error)
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-06-20 Thread Prem Santosh (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prem Santosh updated FLINK-9598:

Description: 
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

[^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 

  was:
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

[Here|^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue. 
(Sorry about the imgur link, I was not able to attach a screenshot on Jira 
because of some error)

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 


> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-06-20 Thread Prem Santosh (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prem Santosh updated FLINK-9598:

Description: 
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

[#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not 
able to attach a screenshot on Jira because of some error)

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 

  was:
We have set the config Minimum Pause Between Checkpoints to be 10 min but 
noticed that when a checkpoint fails (because it timesout before it completes) 
the application immediately starts taking the next checkpoint. This basically 
stalls the application's progress since its always taking checkpoints.

Here i[s a screenshot of |https://imgur.com/a/z7NMY4H]this issue. (Sorry about 
the imgur link, I was not able to attach a screenshot on Jira because of some 
error)

Details:
 * Running Flink-1.3.2 on EMR
 * checkpoint timeout duration: 40 min
 * minimum pause between checkpoints: 10 min

There is also a [relevant 
thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
 that I found on the Flink users group.

 


> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [#Here] is a screenshot of this issue. (Sorry about the imgur link, I was not 
> able to attach a screenshot on Jira because of some error)
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-06-20 Thread Prem Santosh (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Prem Santosh updated FLINK-9598:

Attachment: Screen Shot 2018-06-20 at 7.44.10 AM.png

> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> Here i[s a screenshot of |https://imgur.com/a/z7NMY4H]this issue. (Sorry 
> about the imgur link, I was not able to attach a screenshot on Jira because 
> of some error)
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518200#comment-16518200
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196791555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Why not checking for `ttl`?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196791555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlConfig.java 
---
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration of state TTL logic.
+ * TODO: builder
+ */
+public class TtlConfig {
+   private final TtlUpdateType ttlUpdateType;
+   private final TtlStateVisibility stateVisibility;
+   private final TtlTimeCharacteristic timeCharacteristic;
+   private final Time ttl;
+
+   public TtlConfig(
+   TtlUpdateType ttlUpdateType,
+   TtlStateVisibility stateVisibility,
+   TtlTimeCharacteristic timeCharacteristic,
+   Time ttl) {
+   Preconditions.checkNotNull(ttlUpdateType);
+   Preconditions.checkNotNull(stateVisibility);
+   Preconditions.checkNotNull(timeCharacteristic);
--- End diff --

Why not checking for `ttl`?


---


[jira] [Commented] (FLINK-9604) Support KafkaProtoBufTableSource

2018-06-20 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518197#comment-16518197
 ] 

Timo Walther commented on FLINK-9604:
-

Thanks for working on this issue [~mingleizhang]. In the future we want to 
separate connector and format. Such that a Protobuf serialization schema can 
also be used for other connectors not only Kafka. Feel free to open a PR for 
serialization/deserialization schemas that convert to Table API types similar 
as done in FLINK-8630.

> Support KafkaProtoBufTableSource
> 
>
> Key: FLINK-9604
> URL: https://issues.apache.org/jira/browse/FLINK-9604
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>
> Protocol buffers are a language-neutral, platform-neutral extensible 
> mechanism for serializing structured data. And in actual production 
> applications, Protocol Buffers is often used for serialization and 
> deserialization. So, I would suggest add this commonly used function.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9628) Options to tolerate truncate failure in BucketingSink

2018-06-20 Thread Truong Duc Kien (JIRA)
Truong Duc Kien created FLINK-9628:
--

 Summary: Options to tolerate truncate failure in BucketingSink
 Key: FLINK-9628
 URL: https://issues.apache.org/jira/browse/FLINK-9628
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: Truong Duc Kien


When the target filesystem is corrupted, truncate operation might fail 
permanently, causing the job to restart repeatedly and unable to progress. 

There should be an option to ignore these kind of errors and allows the program 
to continue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518183#comment-16518183
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl);
+   } else {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   }
+
httpRequest.headers()
-   .add(HttpHeaders.Names.CONTENT_LENGTH, 
payload.capacity())
-   .add(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE)
.set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
+   if (!multipart) {
+   httpRequest.headers()
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518182#comment-16518182
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793605
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793605
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


---


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl);
+   } else {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   }
+
httpRequest.headers()
-   .add(HttpHeaders.Names.CONTENT_LENGTH, 
payload.capacity())
-   .add(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE)
.set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
+   if (!multipart) {
+   httpRequest.headers()
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518180#comment-16518180
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6189

 [FLINK-9599][rest] RestClient supports FileUploads 

This PR is based on a squashed #6178.

## What is the purpose of the change

This PR extends the `RestClient` to allow sending multipart messages 
containing files and an optional json payload.

@tillrohrmann Regarding the previously discussed issue about 
´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change 
to the `FileUploadHandler` and running the `RestClientMultipartTest`.

## Brief change log

* rework `FileUploadHandlerTest` into an abstract base class, to re-use 
classes for the `RestClient`


## Verifying this change

* add `RequestProcess` interface for hiding differences between 
non-/multipart messages
* refactor `RestClient#sendRequest` into `internalSendRequest` that allows 
passing a `RequestProcessor`
* see `RestClientMultipartTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_gamma

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6189


commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1
Author: zentol 
Date:   2018-06-18T08:54:42Z

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb
Author: zentol 
Date:   2018-06-19T07:45:09Z

[FLINK-9599][rest] RestClient supports FileUploads




> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6189: [FLINK-9599][rest] RestClient supports FileUploads...

2018-06-20 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6189

 [FLINK-9599][rest] RestClient supports FileUploads 

This PR is based on a squashed #6178.

## What is the purpose of the change

This PR extends the `RestClient` to allow sending multipart messages 
containing files and an optional json payload.

@tillrohrmann Regarding the previously discussed issue about 
´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change 
to the `FileUploadHandler` and running the `RestClientMultipartTest`.

## Brief change log

* rework `FileUploadHandlerTest` into an abstract base class, to re-use 
classes for the `RestClient`


## Verifying this change

* add `RequestProcess` interface for hiding differences between 
non-/multipart messages
* refactor `RestClient#sendRequest` into `internalSendRequest` that allows 
passing a `RequestProcessor`
* see `RestClientMultipartTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_gamma

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6189


commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1
Author: zentol 
Date:   2018-06-18T08:54:42Z

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb
Author: zentol 
Date:   2018-06-19T07:45:09Z

[FLINK-9599][rest] RestClient supports FileUploads




---


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196786370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

Does it make sense to never expire the value when the 
`ttValue.getExpirationTimestamp()` return negative?


---


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518155#comment-16518155
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196786370
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

Does it make sense to never expire the value when the 
`ttValue.getExpirationTimestamp()` return negative?


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9514) Create wrapper with TTL logic for value state

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518152#comment-16518152
 ] 

ASF GitHub Bot commented on FLINK-9514:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196784275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

This looks like a bit problematic, because the 
`ttlValue.getExpirationTimestamp()` might be negative. E.g when the user 
provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value 
should never be expired, but according to the current code, it will immediately 
expired.


> Create wrapper with TTL logic for value state
> -
>
> Key: FLINK-9514
> URL: https://issues.apache.org/jira/browse/FLINK-9514
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> TTL state decorator uses original state with packed TTL and add TTL logic 
> using time provider:
> {code:java}
> TtlValueState implements ValueState {
>   ValueState> underlyingState;
>   InternalTimeService timeProvider;
>   V value() {
> TtlValue valueWithTtl = underlyingState.get();
> // ttl logic here (e.g. update timestamp)
> return valueWithTtl.getValue();
>   }
>   void update() { ... underlyingState.update(valueWithTtl) ...  }
> }
> {code}
> TTL decorators are apply to state produced by normal state binder in its TTL 
> wrapper from FLINK-9513



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6186: [FLINK-9514,FLINK-9515,FLINK-9516] State ttl wrapp...

2018-06-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6186#discussion_r196784275
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+/**
+ * Base class for TTL logic wrappers.
+ *
+ * @param  Type of originally wrapped object
+ */
+abstract class AbstractTtlDecorator {
+   final T original;
+   final TtlConfig config;
+   final TtlTimeProvider timeProvider;
+   final boolean updateTsOnRead;
+   final boolean returnExpired;
+
+   AbstractTtlDecorator(
+   T original,
+   TtlConfig config,
+   TtlTimeProvider timeProvider) {
+   Preconditions.checkNotNull(original);
+   Preconditions.checkNotNull(config);
+   Preconditions.checkNotNull(timeProvider);
+   Preconditions.checkArgument(config.getTtlUpdateType() != 
TtlUpdateType.Disabled,
+   "State does not need to be wrapped with TTL if it is 
configured as disabled.");
+   this.original = original;
+   this.config = config;
+   this.timeProvider = timeProvider;
+   this.updateTsOnRead = config.getTtlUpdateType() == 
TtlUpdateType.OnReadAndWrite;
+   this.returnExpired = config.getStateVisibility() == 
TtlStateVisibility.Relaxed;
+   }
+
+V getUnexpried(TtlValue ttlValue) {
+   return ttlValue == null || (expired(ttlValue) && 
!returnExpired) ? null : ttlValue.getUserValue();
+   }
+
+boolean expired(TtlValue ttlValue) {
+   return ttlValue != null && ttlValue.getExpirationTimestamp() <= 
timeProvider.currentTimestamp();
--- End diff --

This looks like a bit problematic, because the 
`ttlValue.getExpirationTimestamp()` might be negative. E.g when the user 
provide `Long.MAX_VALUE` as the TTL value, what he expected is that the value 
should never be expired, but according to the current code, it will immediately 
expired.


---


[jira] [Comment Edited] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-20 Thread xueyu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518099#comment-16518099
 ] 

xueyu edited comment on FLINK-6846 at 6/20/18 1:36 PM:
---

I want to submit a PR for this function, would you mind change Assignee to me, 
thanks

I have submitted PR FLINK-6846


was (Author: xueyu7452):
I want to submit a PR for this function, would you mind change Assignee to me, 
thanks

> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6846) Add TIMESTAMPADD supported in TableAPI

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518135#comment-16518135
 ] 

ASF GitHub Bot commented on FLINK-6846:
---

GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6188

[FLINK-6846] add timestampAdd tableApi



## What is the purpose of the change
This PR add timestampAdd TableApi

## Brief change log
add timestampAdd expression parser and node construction

## Verifying this change
unit test


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no

## Documentation

  - Does this pull request introduce a new feature? yes 
  - If yes, how is the feature documented? docs 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink timeStampAdd-tableApi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6188


commit 17511d36c39c86ce342a27583a474991713d0062
Author: xueyu <278006819@...>
Date:   2018-06-20T13:30:02Z

timestampAdd tableApi




> Add TIMESTAMPADD supported in TableAPI
> --
>
> Key: FLINK-6846
> URL: https://issues.apache.org/jira/browse/FLINK-6846
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Major
>  Labels: starter
>
> See FLINK-6811 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6188: [FLINK-6846] add timestampAdd tableApi

2018-06-20 Thread xueyumusic
GitHub user xueyumusic opened a pull request:

https://github.com/apache/flink/pull/6188

[FLINK-6846] add timestampAdd tableApi



## What is the purpose of the change
This PR add timestampAdd TableApi

## Brief change log
add timestampAdd expression parser and node construction

## Verifying this change
unit test


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no

## Documentation

  - Does this pull request introduce a new feature? yes 
  - If yes, how is the feature documented? docs 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xueyumusic/flink timeStampAdd-tableApi

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6188.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6188


commit 17511d36c39c86ce342a27583a474991713d0062
Author: xueyu <278006819@...>
Date:   2018-06-20T13:30:02Z

timestampAdd tableApi




---


  1   2   >