[jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555251#comment-16555251
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204992515
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
--- End diff --

There are no `@Test` annotations to run tests. We should also rename the 
method in `testXXX` as we usually do it. The super class method names should be 
updated as `runTransportClientTest` is not correct anymore.


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555242#comment-16555242
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204756208
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
/**
-* Creates an Elasticsearch {@link Client}.
+* Creates an Elasticsearch client implementing {@link AutoCloseable}.
 *
 * @param clientConfig The configuration to use when constructing the 
client.
 * @return The created client.
 */
-   Client createClient(Map clientConfig);
+   public abstract AutoCloseable createClient(Map 
clientConfig);
+
+   public abstract BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener);
--- End diff --

No docs here?


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555246#comment-16555246
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204990734
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
--- End diff --

Add `@PublicEvolving`


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555252#comment-16555252
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991076
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
--- End diff --

Remove unused import.


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555243#comment-16555243
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204775927
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
--- End diff --

Update docs here.


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555244#comment-16555244
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758828
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -176,7 +175,7 @@ public void setDelayMillis(long delayMillis) {
private AtomicLong numPendingRequests = new AtomicLong(0);
 
/** Elasticsearch client created using the call bridge. */
-   private transient Client client;
+   private transient AutoCloseable client;
--- End diff --

Same here. Why not parameterize the class and be type save?


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555248#comment-16555248
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
+   final String index = "transport-client-test-index";
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStreamSource> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+   Map userConfig = new HashMap<>();
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+   new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+   env.execute("Elasticsearch RestHighLevelClient Test");
+
+   // verify the results
+   Client client = embeddedNodeEnv.getClient();
+   SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+   client.close();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is {@code null}.
+*/
+   public void runNullTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig, null, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+   } catch (IllegalArgumentException expectedException) {
+   // test passes
+   return;
+   }
+
+   fail();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is empty.
+*/
+   public void runEmptyTransportClientTest() throws Exception {
+   try {

[jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555253#comment-16555253
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204999073
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

The test is not runnable on my machine.

```
Elasticsearch node is not running.
grep: /Users/twalthr/flink/flink/build-target/log/*.out: No such file or 
directory

[FAIL] './test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
 failed after 0 minutes and 18 seconds! Test exited with exit code 1
```

The tests exits before elastic search has actually started. Also killing 
does not work. An Elasticsearch process is still running afterwards.


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555240#comment-16555240
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204757871
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
--- End diff --

Parameterize the class instead of using `AutoClosable` as a synonym for the 
a client that implements this interface. This avoids manual casting in 
subclasses.


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


***UNCHECKED*** [jira] [Commented] (FLINK-9885) End-to-end test: Elasticsearch 6.x connector

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555250#comment-16555250
 ] 

ASF GitHub Bot commented on FLINK-9885:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991878
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

Are these values still valid? I thought we are not relying on Netty anymore 
with the rest client?


> End-to-end test: Elasticsearch 6.x connector
> 
>
> Key: FLINK-9885
> URL: https://issues.apache.org/jira/browse/FLINK-9885
> Project: Flink
>  Issue Type: Sub-task
>  Components: ElasticSearch Connector, Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We have decided to try and merge the pending Elasticsearch 6.x PRs. This 
> should also come with an end-to-end test that covers this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204756208
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
/**
-* Creates an Elasticsearch {@link Client}.
+* Creates an Elasticsearch client implementing {@link AutoCloseable}.
 *
 * @param clientConfig The configuration to use when constructing the 
client.
 * @return The created client.
 */
-   Client createClient(Map clientConfig);
+   public abstract AutoCloseable createClient(Map 
clientConfig);
+
+   public abstract BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener);
--- End diff --

No docs here?


---


[jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555238#comment-16555238
 ] 

ASF GitHub Bot commented on FLINK-9897:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204999405
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
--- End diff --

having a local variable here seems a bit redundant, since we always adjust 
it right afterwards.


> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> --
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204999073
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

The test is not runnable on my machine.

```
Elasticsearch node is not running.
grep: /Users/twalthr/flink/flink/build-target/log/*.out: No such file or 
directory

[FAIL] './test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
 failed after 0 minutes and 18 seconds! Test exited with exit code 1
```

The tests exits before elastic search has actually started. Also killing 
does not work. An Elasticsearch process is still running afterwards.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991878
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

Are these values still valid? I thought we are not relying on Netty anymore 
with the rest client?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758828
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -176,7 +175,7 @@ public void setDelayMillis(long delayMillis) {
private AtomicLong numPendingRequests = new AtomicLong(0);
 
/** Elasticsearch client created using the call bridge. */
-   private transient Client client;
+   private transient AutoCloseable client;
--- End diff --

Same here. Why not parameterize the class and be type save?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758434
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -64,13 +65,15 @@
 * @param builder the {@link BulkProcessor.Builder} to configure.
 * @param flushBackoffPolicy user-provided backoff retry settings 
({@code null} if the user disabled backoff retries).
 */
-   void configureBulkProcessorBackoff(
+   public abstract void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
/**
 * Perform any necessary state cleanup.
 */
-   void cleanup();
+   public void cleanup() {
--- End diff --

Use Java 8 defaults and let this class stay an interface?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
+   final String index = "transport-client-test-index";
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStreamSource> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+   Map userConfig = new HashMap<>();
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+   new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+   env.execute("Elasticsearch RestHighLevelClient Test");
+
+   // verify the results
+   Client client = embeddedNodeEnv.getClient();
+   SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+   client.close();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is {@code null}.
+*/
+   public void runNullTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig, null, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+   } catch (IllegalArgumentException expectedException) {
+   // test passes
+   return;
+   }
+
+   fail();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is empty.
+*/
+   public void runEmptyTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig,
+   Collect

[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204994308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Still necessary?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204990734
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
--- End diff --

Add `@PublicEvolving`


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204775927
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
--- End diff --

Update docs here.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204992515
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
--- End diff --

There are no `@Test` annotations to run tests. We should also rename the 
method in `testXXX` as we usually do it. The super class method names should be 
updated as `runTransportClientTest` is not correct anymore.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991006
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
+*/
+   public ElasticsearchSink(Map userConfig, List 
httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) {
+
+   this(userConfig, httpHosts, elasticsearchSinkFunction, new 
NoOpFailureHandler());
+   }
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param failureHandler This is used to handle failed {@link 
ActionRequest}
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
--- End diff --

Fix two invalid Javadocs.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993809
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

We should add tests for our examples.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991076
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
--- End diff --

Remove unused import.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204757871
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
--- End diff --

Parameterize the class instead of using `AutoClosable` as a synonym for the 
a client that implements this interface. This avoids manual casting in 
subclasses.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204750192
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, 
transportAddresses, new Elasticsea
 }
 }));{% endhighlight %}
 
+
+{% highlight java %}
+DataStream input = ...;
+
+List httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new 
ElasticsearchSinkFunction() {
--- End diff --

Add an example for the user config as well to be in sync with the examples 
of other versions? Because the following paragraph mentions: 

> Especially important is the `cluster.name` parameter

Btw could you also add imports to your examples. I just started to do this 
with my code examples to make it easier for people to find the used classes 
(see 
[here](https://ci.apache.org/projects/flink/flink-docs-master/dev/java_lambdas.html))


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204752713
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, 
transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new 
ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+val json = new java.util.HashMap[String, String]
+json.put("data", element)
+
+return Requests.indexRequest()
+.index("my-index")
+.type("my-type")
+.source(json)
+  }
+}))
+{% endhighlight %}
+
 
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used 
to configure the `ElasticsearchSink`.
--- End diff --

Remove "is used to"?


---


[jira] [Commented] (FLINK-9694) Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555234#comment-16555234
 ] 

ASF GitHub Bot commented on FLINK-9694:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/6231


> Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor
> 
>
> Key: FLINK-9694
> URL: https://issues.apache.org/jira/browse/FLINK-9694
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> the partial specific exception stack trace :
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:120)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:123)
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:319)
> ... 20 more{code}
> related code is : 
> {code:java}
> public CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
> nestedSerializers) {
>Preconditions.checkNotNull(nestedSerializers);
>this.nestedSerializersAndConfigs = new 
> ArrayList<>(nestedSerializers.length);
>for (TypeSerializer nestedSerializer : nestedSerializers) {
>   TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
>   this.nestedSerializersAndConfigs.add(
>  new Tuple2, TypeSerializerConfigSnapshot>(
> nestedSerializer.duplicate(),
> Preconditions.checkNotNull(configSnapshot)));
>}
> }
> {code}
> exception happens at : 
> {code:java}
> TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
> {code}
> the reason is the type of constructor's parameter "..." used "varargs" 
> feature. The  initialize code in *CRowSerializer.scala* is : 
> {code:java}
> def this() = this(null)// Scala code
> {code}
> when invoked this, actually the the type of 
> CompositeTypeSerializerConfigSnapshot's
> nestedSerializers parameter is :
> {code:java}
> TypeSerializer[] nestedSerializers = new TypeSerializer[] {null};
> {code}
> so the checkNotNull precondition statement :
> {code:java}
> Preconditions.checkNotNull(nestedSerializers);
> {code}
> is always useless.
> So we should check the object reference in _for_ loop to protect NPE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9694) Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555233#comment-16555233
 ] 

ASF GitHub Bot commented on FLINK-9694:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
PR #6392 fixed this issue.


> Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor
> 
>
> Key: FLINK-9694
> URL: https://issues.apache.org/jira/browse/FLINK-9694
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> the partial specific exception stack trace :
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:120)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:123)
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:319)
> ... 20 more{code}
> related code is : 
> {code:java}
> public CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
> nestedSerializers) {
>Preconditions.checkNotNull(nestedSerializers);
>this.nestedSerializersAndConfigs = new 
> ArrayList<>(nestedSerializers.length);
>for (TypeSerializer nestedSerializer : nestedSerializers) {
>   TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
>   this.nestedSerializersAndConfigs.add(
>  new Tuple2, TypeSerializerConfigSnapshot>(
> nestedSerializer.duplicate(),
> Preconditions.checkNotNull(configSnapshot)));
>}
> }
> {code}
> exception happens at : 
> {code:java}
> TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
> {code}
> the reason is the type of constructor's parameter "..." used "varargs" 
> feature. The  initialize code in *CRowSerializer.scala* is : 
> {code:java}
> def this() = this(null)// Scala code
> {code}
> when invoked this, actually the the type of 
> CompositeTypeSerializerConfigSnapshot's
> nestedSerializers parameter is :
> {code:java}
> TypeSerializer[] nestedSerializers = new TypeSerializer[] {null};
> {code}
> so the checkNotNull precondition statement :
> {code:java}
> Preconditions.checkNotNull(nestedSerializers);
> {code}
> is always useless.
> So we should check the object reference in _for_ loop to protect NPE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204999405
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
--- End diff --

having a local variable here seems a bit redundant, since we always adjust 
it right afterwards.


---


[jira] [Commented] (FLINK-9694) Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555230#comment-16555230
 ] 

ASF GitHub Bot commented on FLINK-9694:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6392
  
+1, I will close my PR #6231 about this issue


> Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor
> 
>
> Key: FLINK-9694
> URL: https://issues.apache.org/jira/browse/FLINK-9694
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> the partial specific exception stack trace :
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:120)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:123)
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:319)
> ... 20 more{code}
> related code is : 
> {code:java}
> public CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
> nestedSerializers) {
>Preconditions.checkNotNull(nestedSerializers);
>this.nestedSerializersAndConfigs = new 
> ArrayList<>(nestedSerializers.length);
>for (TypeSerializer nestedSerializer : nestedSerializers) {
>   TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
>   this.nestedSerializersAndConfigs.add(
>  new Tuple2, TypeSerializerConfigSnapshot>(
> nestedSerializer.duplicate(),
> Preconditions.checkNotNull(configSnapshot)));
>}
> }
> {code}
> exception happens at : 
> {code:java}
> TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
> {code}
> the reason is the type of constructor's parameter "..." used "varargs" 
> feature. The  initialize code in *CRowSerializer.scala* is : 
> {code:java}
> def this() = this(null)// Scala code
> {code}
> when invoked this, actually the the type of 
> CompositeTypeSerializerConfigSnapshot's
> nestedSerializers parameter is :
> {code:java}
> TypeSerializer[] nestedSerializers = new TypeSerializer[] {null};
> {code}
> so the checkNotNull precondition statement :
> {code:java}
> Preconditions.checkNotNull(nestedSerializers);
> {code}
> is always useless.
> So we should check the object reference in _for_ loop to protect NPE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204996330
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
--- End diff --

just a minor nit pick here: `adjustmentEndTimeNanos` would be better named 
as `adjustedEndTimeNanos`


---


[GitHub] flink issue #6231: [FLINK-9694] Potentially NPE in CompositeTypeSerializerCo...

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6231
  
PR #6392 fixed this issue.


---


[GitHub] flink pull request #6231: [FLINK-9694] Potentially NPE in CompositeTypeSeria...

2018-07-24 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/6231


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204998677
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
--- End diff --

If this is a method without side effects on the fields of the 
`ShardConsumer`, it might be better off to make this method static, and pass in 
the current `maxNumberOfRecordsPerFetch` as an argument.
This makes it more clear that it is only an utility method to calculate the 
number of records to read.


---


[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...

2018-07-24 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6392
  
+1, I will close my PR #6231 about this issue


---


[jira] [Commented] (FLINK-6935) Integration of SQL and CEP

2018-07-24 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555222#comment-16555222
 ] 

Timo Walther commented on FLINK-6935:
-

Thanks for renewing the discussion. Sorry, that this feature has not received 
more attention in the last couple of months. We still want to have this feature 
in Flink. Actually, it was on my list for Flink 1.6 but couldn't make it. We 
should aim for Flink 1.7. [~dawidwys] aligned the CEP side for 
{{MATCH_RECOGNIZE}}. The PR itself should not have many merge conflicts, so we 
can review it soon. See also the recent discussion 
[here|https://twitter.com/julianhyde/status/1021472015478214656].

> Integration of SQL and CEP
> --
>
> Key: FLINK-6935
> URL: https://issues.apache.org/jira/browse/FLINK-6935
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP, Table API & SQL
>Reporter: Jark Wu
>Assignee: Dian Fu
>Priority: Major
>
> Flink's CEP library is a great library for complex event processing, more and 
> more customers are expressing their interests in it. But it also has some 
> limitations that users usually have to write a lot of code even for a very 
> simple pattern match use case as it currently only supports the Java API.
> CEP DSLs and SQLs strongly resemble each other. CEP's additional features 
> compared to SQL boil down to pattern detection. So It will be awesome to 
> consolidate CEP and SQL. It makes SQL more powerful to support more usage 
> scenario. And it gives users the ability to easily and quickly to build CEP 
> applications.
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to 
> start this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9942) Guard handlers against null fields in requests

2018-07-24 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555214#comment-16555214
 ] 

Chesnay Schepler commented on FLINK-9942:
-

[~GJL] What was the reasoning behind this change?

> Guard handlers against null fields in requests
> --
>
> Key: FLINK-9942
> URL: https://issues.apache.org/jira/browse/FLINK-9942
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> In FLINK-8233 the {{ObjectMapper}} used for the REST API was modified to not 
> fail on missing creator properties. This means that any field for any request 
> may be null.
> Since fields not being null was an assumption that handlers were previously 
> built on, we now have to scan every implementation to ensure they can't fail 
> with an NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfigSnapsh...

2018-07-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6392
  
LGTM, +1


---


[jira] [Commented] (FLINK-9694) Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555209#comment-16555209
 ] 

ASF GitHub Bot commented on FLINK-9694:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6392
  
LGTM, +1


> Potentially NPE in CompositeTypeSerializerConfigSnapshot constructor
> 
>
> Key: FLINK-9694
> URL: https://issues.apache.org/jira/browse/FLINK-9694
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> the partial specific exception stack trace :
> {code:java}
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.(CompositeTypeSerializerConfigSnapshot.java:53)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:120)
> at 
> org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot.(CRowSerializer.scala:123)
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown Source)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at 
> org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:319)
> ... 20 more{code}
> related code is : 
> {code:java}
> public CompositeTypeSerializerConfigSnapshot(TypeSerializer... 
> nestedSerializers) {
>Preconditions.checkNotNull(nestedSerializers);
>this.nestedSerializersAndConfigs = new 
> ArrayList<>(nestedSerializers.length);
>for (TypeSerializer nestedSerializer : nestedSerializers) {
>   TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
>   this.nestedSerializersAndConfigs.add(
>  new Tuple2, TypeSerializerConfigSnapshot>(
> nestedSerializer.duplicate(),
> Preconditions.checkNotNull(configSnapshot)));
>}
> }
> {code}
> exception happens at : 
> {code:java}
> TypeSerializerConfigSnapshot configSnapshot = 
> nestedSerializer.snapshotConfiguration();
> {code}
> the reason is the type of constructor's parameter "..." used "varargs" 
> feature. The  initialize code in *CRowSerializer.scala* is : 
> {code:java}
> def this() = this(null)// Scala code
> {code}
> when invoked this, actually the the type of 
> CompositeTypeSerializerConfigSnapshot's
> nestedSerializers parameter is :
> {code:java}
> TypeSerializer[] nestedSerializers = new TypeSerializer[] {null};
> {code}
> so the checkNotNull precondition statement :
> {code:java}
> Preconditions.checkNotNull(nestedSerializers);
> {code}
> is always useless.
> So we should check the object reference in _for_ loop to protect NPE. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6935) Integration of SQL and CEP

2018-07-24 Thread Dian Fu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555201#comment-16555201
 ] 

Dian Fu commented on FLINK-6935:


The community is focusing on modules such as runtime, table, etc in the past 
few months and so the pull requests of this feature have not been reviewed. 
[~twalthr], not sure if you have time to review this feature right now? If yes, 
I will update the pull requests.

> Integration of SQL and CEP
> --
>
> Key: FLINK-6935
> URL: https://issues.apache.org/jira/browse/FLINK-6935
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP, Table API & SQL
>Reporter: Jark Wu
>Assignee: Dian Fu
>Priority: Major
>
> Flink's CEP library is a great library for complex event processing, more and 
> more customers are expressing their interests in it. But it also has some 
> limitations that users usually have to write a lot of code even for a very 
> simple pattern match use case as it currently only supports the Java API.
> CEP DSLs and SQLs strongly resemble each other. CEP's additional features 
> compared to SQL boil down to pattern detection. So It will be awesome to 
> consolidate CEP and SQL. It makes SQL more powerful to support more usage 
> scenario. And it gives users the ability to easily and quickly to build CEP 
> applications.
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to 
> start this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9942) Guard handlers against null fields in requests

2018-07-24 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9942:
---

 Summary: Guard handlers against null fields in requests
 Key: FLINK-9942
 URL: https://issues.apache.org/jira/browse/FLINK-9942
 Project: Flink
  Issue Type: Improvement
  Components: REST
Affects Versions: 1.5.0, 1.6.0
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


In FLINK-8233 the {{ObjectMapper}} used for the REST API was modified to not 
fail on missing creator properties. This means that any field for any request 
may be null.

Since fields not being null was an assumption that handlers were previously 
built on, we now have to scan every implementation to ensure they can't fail 
with an NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9938) State TTL cleanup during full state scan upon checkpointing

2018-07-24 Thread aitozi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555190#comment-16555190
 ] 

aitozi commented on FLINK-9938:
---

Can it be done by checking periodic ? so that the clean up of state is not 
depended on the usages of checkpoint ?

> State TTL cleanup during full state scan upon checkpointing
> ---
>
> Key: FLINK-9938
> URL: https://issues.apache.org/jira/browse/FLINK-9938
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.0
>Reporter: Andrey Zagrebin
>Assignee: Andrey Zagrebin
>Priority: Major
> Fix For: 1.6.0
>
>
> We can try to piggyback full state scan during certain checkpoint processes 
> in backends, check TTL expiration for every entry and evict expired to speed 
> up cleanup.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555185#comment-16555185
 ] 

ASF GitHub Bot commented on FLINK-9941:
---

Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/6412
  
Duplicate. Close.


> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555186#comment-16555186
 ] 

ASF GitHub Bot commented on FLINK-9941:
---

Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/6412


> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before close

2018-07-24 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/6412
  
Duplicate. Close.


---


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-24 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/6412


---


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555183#comment-16555183
 ] 

ASF GitHub Bot commented on FLINK-9941:
---

GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6412

[FLINK-9941] Flush in ScalaCsvOutputFormat before close

## What is the purpose of the change
- Flush in ScalaCsvOutputFormat before close.We've already finished it in 
org.apache.flink.api.java.io.CsvOutputFormat.

## Brief change log
- add flush in ScalaCsvOutputFormat before close.
## Verifying this change
- unit tests.
## Does this pull request potentially affect one of the following parts:
- no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink FLINK-9941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6412


commit 636456d2398bef69a805b96dfb0945459cfcfada
Author: wind 
Date:   2018-07-25T06:01:36Z

flush ScalaCsvOutputFormat before close




> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9934) Kafka table source factory produces invalid field mapping

2018-07-24 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-9934.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.7.0: 378cbb7c2e580ba73f215234e7dff542c3e2bc97
Fixed in 1.6.0: d7b80b0aa0ae6451da46b910b07b17415cb2530a

> Kafka table source factory produces invalid field mapping
> -
>
> Key: FLINK-9934
> URL: https://issues.apache.org/jira/browse/FLINK-9934
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The Kafka table source factory produces an invalid field mapping when 
> referencing a rowtime attribute from an input field. The check in 
> {{TableSourceUtil#validateTableSource}} therefore can fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-24 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6412

[FLINK-9941] Flush in ScalaCsvOutputFormat before close

## What is the purpose of the change
- Flush in ScalaCsvOutputFormat before close.We've already finished it in 
org.apache.flink.api.java.io.CsvOutputFormat.

## Brief change log
- add flush in ScalaCsvOutputFormat before close.
## Verifying this change
- unit tests.
## Does this pull request potentially affect one of the following parts:
- no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink FLINK-9941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6412


commit 636456d2398bef69a805b96dfb0945459cfcfada
Author: wind 
Date:   2018-07-25T06:01:36Z

flush ScalaCsvOutputFormat before close




---


[jira] [Commented] (FLINK-9934) Kafka table source factory produces invalid field mapping

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9934?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555181#comment-16555181
 ] 

ASF GitHub Bot commented on FLINK-9934:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6403


> Kafka table source factory produces invalid field mapping
> -
>
> Key: FLINK-9934
> URL: https://issues.apache.org/jira/browse/FLINK-9934
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> The Kafka table source factory produces an invalid field mapping when 
> referencing a rowtime attribute from an input field. The check in 
> {{TableSourceUtil#validateTableSource}} therefore can fail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5281) Extend KafkaJsonTableSources to support nested data

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555180#comment-16555180
 ] 

ASF GitHub Bot commented on FLINK-5281:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3124


> Extend KafkaJsonTableSources to support nested data
> ---
>
> Key: FLINK-5281
> URL: https://issues.apache.org/jira/browse/FLINK-5281
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.0
>
>
> The {{TableSource}} does currently not support nested data. 
> Once FLINK-5280 is fixed, the KafkaJsonTableSources should be extended to 
> support nested input data. The nested data should be produced as {{Row}}s 
> nested in {{Row}}s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6403: [FLINK-9934] [table] Fix invalid field mapping by ...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6403


---


[GitHub] flink pull request #3124: [FLINK-5281] Extend KafkaJsonTableSources to suppo...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3124


---


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555179#comment-16555179
 ] 

ASF GitHub Bot commented on FLINK-9941:
---

GitHub user Lemonjing opened a pull request:

https://github.com/apache/flink/pull/6411

[FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to 
ensure CI stability

## What is the purpose of the change
This pull request update scala api `ScalaCsvOutputFormat` to increase CI 
stability.

## Brief change log
Add flush method before close function in ScalaCsvOutputFormat for scala 
API.

## Verifying this change
This change is already covered by existing tests, such as 
ScalarFunctionsTest

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)

## Documentation
Does this pull request introduce a new feature? (yes / **no**)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
**not documented**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Lemonjing/flink csv-close-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6411


commit 6c6120722eef81c2c275b92a13a5687fef35e7bb
Author: lemonjing <932191671@...>
Date:   2018-07-25T05:46:55Z

[hotfix] Flush in ScalaCsvOutputFormat before closing, to ensure CI 
stability




> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9941:
--
Labels: pull-request-available  (was: )

> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>  Labels: pull-request-available
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...

2018-07-24 Thread Lemonjing
GitHub user Lemonjing opened a pull request:

https://github.com/apache/flink/pull/6411

[FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to 
ensure CI stability

## What is the purpose of the change
This pull request update scala api `ScalaCsvOutputFormat` to increase CI 
stability.

## Brief change log
Add flush method before close function in ScalaCsvOutputFormat for scala 
API.

## Verifying this change
This change is already covered by existing tests, such as 
ScalarFunctionsTest

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)

## Documentation
Does this pull request introduce a new feature? (yes / **no**)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
**not documented**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Lemonjing/flink csv-close-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6411


commit 6c6120722eef81c2c275b92a13a5687fef35e7bb
Author: lemonjing <932191671@...>
Date:   2018-07-25T05:46:55Z

[hotfix] Flush in ScalaCsvOutputFormat before closing, to ensure CI 
stability




---


[jira] [Commented] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555175#comment-16555175
 ] 

buptljy commented on FLINK-9941:


[~lemonjing] You're right. I'll fix it.

> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-9941:
--

Assignee: buptljy

> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Assignee: buptljy
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread Rannn Tao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rannn Tao updated FLINK-9941:
-
Description: 
Because not every stream's close method will flush, in order to ensure the 
stability of continuous integration, we need to manually call flush() before 
close().

I noticed that CsvOutputFormat (Java API) has done this. As follows.

 

 
{code:java}
//CsvOutputFormat
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
}
super.close();
}
{code}
 

  was:
Because not every stream's close method will be refreshed, in order to ensure 
the stability of continuous integration, we need to manually call flush() 
before close().

I noticed that CsvOutputFormat (Java API) has done this. As follows.

 

 
{code:java}
//CsvOutputFormat
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
}
super.close();
}
{code}
 


> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows.
>  
>  
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread Rannn Tao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rannn Tao updated FLINK-9941:
-
Description: 
Because not every stream's close method will flush, in order to ensure the 
stability of continuous integration, we need to manually call flush() before 
close().

I noticed that CsvOutputFormat (Java API) has done this. As follows. 
{code:java}
//CsvOutputFormat
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
}
super.close();
}
{code}
 

  was:
Because not every stream's close method will flush, in order to ensure the 
stability of continuous integration, we need to manually call flush() before 
close().

I noticed that CsvOutputFormat (Java API) has done this. As follows.

 

 
{code:java}
//CsvOutputFormat
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
}
super.close();
}
{code}
 


> Flush in ScalaCsvOutputFormat before close method
> -
>
> Key: FLINK-9941
> URL: https://issues.apache.org/jira/browse/FLINK-9941
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala API
>Affects Versions: 1.5.1
>Reporter: Rannn Tao
>Priority: Major
>
> Because not every stream's close method will flush, in order to ensure the 
> stability of continuous integration, we need to manually call flush() before 
> close().
> I noticed that CsvOutputFormat (Java API) has done this. As follows. 
> {code:java}
> //CsvOutputFormat
> public void close() throws IOException {
> if (wrt != null) {
> this.wrt.flush();
> this.wrt.close();
> }
> super.close();
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9941) Flush in ScalaCsvOutputFormat before close method

2018-07-24 Thread Rannn Tao (JIRA)
Rannn Tao created FLINK-9941:


 Summary: Flush in ScalaCsvOutputFormat before close method
 Key: FLINK-9941
 URL: https://issues.apache.org/jira/browse/FLINK-9941
 Project: Flink
  Issue Type: Improvement
  Components: Scala API
Affects Versions: 1.5.1
Reporter: Rannn Tao


Because not every stream's close method will be refreshed, in order to ensure 
the stability of continuous integration, we need to manually call flush() 
before close().

I noticed that CsvOutputFormat (Java API) has done this. As follows.

 

 
{code:java}
//CsvOutputFormat
public void close() throws IOException {
if (wrt != null) {
this.wrt.flush();
this.wrt.close();
}
super.close();
}
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6935) Integration of SQL and CEP

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555165#comment-16555165
 ] 

buptljy commented on FLINK-6935:


I am very interested in the CEP part. And my team is exploring this feature and 
try to integrate it into our products, and I also want to contribute to this 
feature. However I find that many tasks are not in the correct status and many 
pull requests have not been reviewed for a long time. Is there anyone can help 
to manage these Jira issues ?

> Integration of SQL and CEP
> --
>
> Key: FLINK-6935
> URL: https://issues.apache.org/jira/browse/FLINK-6935
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP, Table API & SQL
>Reporter: Jark Wu
>Assignee: Dian Fu
>Priority: Major
>
> Flink's CEP library is a great library for complex event processing, more and 
> more customers are expressing their interests in it. But it also has some 
> limitations that users usually have to write a lot of code even for a very 
> simple pattern match use case as it currently only supports the Java API.
> CEP DSLs and SQLs strongly resemble each other. CEP's additional features 
> compared to SQL boil down to pattern detection. So It will be awesome to 
> consolidate CEP and SQL. It makes SQL more powerful to support more usage 
> scenario. And it gives users the ability to easily and quickly to build CEP 
> applications.
> The FLIP can be found here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-20%3A+Integration+of+SQL+and+CEP
> This is an umbrella issue for the FLIP. We should wait for Calcite 1.13 to 
> start this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6938) IterativeCondition should support RichFunction interface

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-6938:
--

Assignee: Jark Wu  (was: buptljy)

> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6938) IterativeCondition should support RichFunction interface

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy reassigned FLINK-6938:
--

Assignee: buptljy  (was: Jark Wu)

> IterativeCondition should support RichFunction interface
> 
>
> Key: FLINK-6938
> URL: https://issues.apache.org/jira/browse/FLINK-6938
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Jark Wu
>Assignee: buptljy
>Priority: Major
>
> In FLIP-20, we need IterativeCondition to support an {{open()}} method to 
> compile the generated code once. We do not want to insert a if condition  in 
> the {{filter()}} method. So I suggest make IterativeCondition support 
> {{RichFunction}} interface.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-07-24 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555162#comment-16555162
 ] 

Timo Walther commented on FLINK-5315:
-

Thanks for working on this [~walterddr]. {{t.select("count(distinct a), 
sum(b)")}} cannot be expressed in Scala API. Shouldn't be {{'a.distinct.count}} 
the correct syntax?

> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>
> Such as 
> {code}
> t.select("count(distinct a), sum(b)")
> {code}
> or 
> {code}
> t.select('a.count.distinct), 'b.sum)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-24 Thread buptljy (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

buptljy closed FLINK-9834.
--
Resolution: Won't Do

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9834) Unable to support scala BasicArrayTypeInfo

2018-07-24 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555157#comment-16555157
 ] 

buptljy commented on FLINK-9834:


It seems that we can use _*implicitly[TypeInformation[Array[Byte]]]*_ to do 
that, although I don't think it's an appropriate way

> Unable to support scala BasicArrayTypeInfo
> --
>
> Key: FLINK-9834
> URL: https://issues.apache.org/jira/browse/FLINK-9834
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: buptljy
>Assignee: buptljy
>Priority: Major
>
> BasicArrayTypeInfo does not supported scala type in some circumstances. For 
> example,
> {code:scala}
> // we set a descriptor here and get value from it.
> val datas: mutable.Map[String, Array[Byte]] = mutable.Map()
> val descriptor = new MapStateDescriptor("realtime-state",
> BasicTypeInfo.STRING_TYPE_INFO, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO)
> val state = context.getKeyedStateStore.getMapState(descriptor)
> val iter = state.entries().iterator()
> while (iter.hasNext) {
>val entry = iter.next()
>datas.put(entry.getKey, entry.getValue)
> }
> {code}
> The codes above cannot be compiled successfully because the "state" is using 
> java.lang.Byte but the "datas" is using scala.Byte, and we have to iterate 
> all values of the Array like "datas.put(entry.getKey, entry.getValue.map(byte 
> => byte.asInstanceOf[Byte]))", which is definitely not what we want.
> I suggest that we create a ScalaBasicArrayTypeInfo like the 
> "BasicArrayTypeInfo" for scala.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9940) File source continuous monitoring mode: S3 files sometimes missed

2018-07-24 Thread Huyen Levan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huyen Levan updated FLINK-9940:
---
Priority: Blocker  (was: Major)

> File source continuous monitoring mode: S3 files sometimes missed
> -
>
> Key: FLINK-9940
> URL: https://issues.apache.org/jira/browse/FLINK-9940
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.5.1
> Environment: Flink 1.5, EMRFS
>Reporter: Huyen Levan
>Priority: Blocker
>  Labels: EMRFS, Flink, S3
>
> When using StreamExecutionEnvironment.readFile() with 
> FileProcessingMode.PROCESS_CONTINUOUSLY mode to monitor an S3 prefix, if 
> there is a high amount of new/modified files at the same time, the directory 
> monitoring process might miss some files. The number of missing files depends 
> on the monitoring interval.
> Cause: Flink tracks which files it has read by remembering the modification 
> time of the file that was added (or modified) last. So when there are 
> multiple files having a same last-modified timestamp.
> Suggested solution (thanks to [[Fabian 
> Hueske|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]):
>  a hybrid approach that keeps the names of all files that have a mod 
> timestamp that is larger than the max mod time minus an offset. 
> _org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9940) File source continuous monitoring mode: S3 files sometimes missed

2018-07-24 Thread Huyen Levan (JIRA)
Huyen Levan created FLINK-9940:
--

 Summary: File source continuous monitoring mode: S3 files 
sometimes missed
 Key: FLINK-9940
 URL: https://issues.apache.org/jira/browse/FLINK-9940
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.5.1
 Environment: Flink 1.5, EMRFS
Reporter: Huyen Levan


When using StreamExecutionEnvironment.readFile() with 
FileProcessingMode.PROCESS_CONTINUOUSLY mode to monitor an S3 prefix, if there 
is a high amount of new/modified files at the same time, the directory 
monitoring process might miss some files. The number of missing files depends 
on the monitoring interval.

Cause: Flink tracks which files it has read by remembering the modification 
time of the file that was added (or modified) last. So when there are multiple 
files having a same last-modified timestamp.

Suggested solution (thanks to [[Fabian 
Hueske|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=user_nodes&user=25]):
 a hybrid approach that keeps the names of all files that have a mod timestamp 
that is larger than the max mod time minus an offset. 
_org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction_



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6410: Release 1.6

2018-07-24 Thread uang520
Github user uang520 closed the pull request at:

https://github.com/apache/flink/pull/6410


---


[GitHub] flink pull request #6410: Release 1.6

2018-07-24 Thread uang520
GitHub user uang520 opened a pull request:

https://github.com/apache/flink/pull/6410

Release 1.6

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/flink release-1.6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6410.patch

To close this p

[jira] [Commented] (FLINK-5860) Replace all the file creating from java.io.tmpdir with TemporaryFolder

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16555003#comment-16555003
 ] 

ASF GitHub Bot commented on FLINK-5860:
---

Github user maheshsenni commented on the issue:

https://github.com/apache/flink/pull/6399
  
@zentol I have addressed your comments in a new commit. Can you look into 
it please?


> Replace all the file creating from java.io.tmpdir with TemporaryFolder
> --
>
> Key: FLINK-5860
> URL: https://issues.apache.org/jira/browse/FLINK-5860
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: shijinkui
>Assignee: Mahesh Senniappan
>Priority: Major
>  Labels: pull-request-available, starter
>
> Search `System.getProperty("java.io.tmpdir")` in whole Flink project. It will 
> get a  Unit test list. Replace all the file creating from `java.io.tmpdir` 
> with TemporaryFolder.
> Who can fix this problem thoroughly?
> ```
> $ grep -ri 'System.getProperty("java.io.tmpdir")' .
> ./flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/example/CassandraTupleWriteAheadSinkExample.java:
>   env.setStateBackend(new FsStateBackend("file:///" + 
> System.getProperty("java.io.tmpdir") + "/flink/backend"));
> ./flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java:
>  File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java:
>  return getMockEnvironment(new File[] { new 
> File(System.getProperty("java.io.tmpdir")) });
> ./flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java:
>public static final String DEFAULT_TASK_MANAGER_TMP_PATH = 
> System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java:
>   final String tempPath = System.getProperty("java.io.tmpdir");
> ./flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java:   
> final File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java: 
> File tempDir = new File(System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/clustering/util/KMeansDataGenerator.java:
>   final String outDir = params.get("output", 
> System.getProperty("java.io.tmpdir"));
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/ml/util/LinearRegressionDataGenerator.java:
> final String tmpDir = System.getProperty("java.io.tmpdir");
> ./flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/relational/util/WebLogDataGenerator.java:
>   final String outPath = System.getProperty("java.io.tmpdir");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-java8/src/test/java/org/apache/flink/runtime/util/JarFileCreatorLambdaTest.java:
>   File out = new File(System.getProperty("java.io.tmpdir"), 
> "jarcreatortest.jar");
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_PYTHON_FILE_PATH = 
> System.getProperty("java.io.tmpdir") + File.separator + "flink_plan";
> ./flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java:
>public static final String FLINK_TMP_DATA_DIR = 
> System.get

[GitHub] flink issue #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with JUnit Te...

2018-07-24 Thread maheshsenni
Github user maheshsenni commented on the issue:

https://github.com/apache/flink/pull/6399
  
@zentol I have addressed your comments in a new commit. Can you look into 
it please?


---


[GitHub] flink pull request #6401: [hotfix]fix typo for variable name dynamicProperti...

2018-07-24 Thread rileyli
Github user rileyli closed the pull request at:

https://github.com/apache/flink/pull/6401


---


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread Lakshmi Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554917#comment-16554917
 ] 

Lakshmi Rao commented on FLINK-9899:


Thanks [~yanghua] ! 

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9899:
---

Assignee: Lakshmi Rao  (was: vinoyang)

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554915#comment-16554915
 ] 

vinoyang commented on FLINK-9899:
-

hi [~glaksh100] I have not started this issue yet. I have released the assignee 
and re-asigned to you. Please feel free to process this issue.

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554901#comment-16554901
 ] 

ASF GitHub Bot commented on FLINK-9897:
---

Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204942574
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+ 

[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204942574
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

Oops, thanks for catching. Updated to use the return value and also to use 
a local variable in the method to avoid re-assigning the class variable 
`maxNumberOfRecordsP

[jira] [Commented] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-07-24 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554866#comment-16554866
 ] 

Rong Rong commented on FLINK-9294:
--

Had an initial diff for discussion on what needs to be done here:

[https://github.com/walterddr/flink/compare/f60aa6e72ca860bb30730472c0c7db8fef500e5c...b8b9c62c2f0226517aee661fb3d8332bdf7dedb8]

Seems like with some recent relaxation in the UDF lookup, all of the above 
mentioned problems are fixed and can be used correctly. The only remaining 
problem is: Too much relaxation causes incorrect runtime exception. 

I can go ahead and close this task by commit the test changes and continue the 
work in FLINK-9501. 

 

Any thoughts and concerns?

 

> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> *{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
> *{{getParameterType}}* or *{{getResultType}}* method explicitly.
> It should be able to resolve the composite type based on the function 
> signature, such as:
> {code:java}
> public String[] eval(Map mapArg) { /* ...  */ }
> {code}
> The function catalog search should do either of the following:
> [Update]
> since we have backward compatibility issue with resolving to a different 
> type, we will not go with the modify type option.
>  - -Automatically resolve that:-
>  -1. *{{ObjectArrayTypeInfo}}* to be the result type.-
>  -2. *{{MapTypeInfo}}* to be the 
> parameter type.-
>  - Improved function mapping to find and locate function with such signatures 
>  
> [Update]
> This ticket should only cover *Map* and *Row* type, It does not cover
>  * ObjectArrayType, since Array is actually resolved by eval method signature 
> correctly.
>  * Pojo types, Pojo will be addressed separately.
> This ticket should consolidate some discrepancy between how TableFunction, 
> AggregateFunction and ScalarFunction resolves types. which at this moment 
> goes through different code path. 
> The rest of the optimization should go to follow up tickets in FLINK-9484



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554837#comment-16554837
 ] 

ASF GitHub Bot commented on FLINK-9897:
---

Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204922265
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+

[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204922265
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

the return value is never used


---


[jira] [Comment Edited] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread Lakshmi Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554822#comment-16554822
 ] 

Lakshmi Rao edited comment on FLINK-9899 at 7/24/18 9:35 PM:
-

[~yanghua] I created a PR with my changes - 
[https://github.com/apache/flink/pull/6409]  Let me know what you think.


was (Author: glaksh100):
[~yanghua] I created a PR with my changes - Let me know what you think.

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: vinoyang
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread Lakshmi Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554822#comment-16554822
 ] 

Lakshmi Rao commented on FLINK-9899:


[~yanghua] I created a PR with my changes - Let me know what you think.

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: vinoyang
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554820#comment-16554820
 ] 

ASF GitHub Bot commented on FLINK-9897:
---

GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6409

Flink 9899.kinesis connector metrics

## What is the purpose of the change

The purpose of this change is to add metrics to the `ShardConsumer` to get 
more observability into the performance of the Kinesis connector, including the 
enhancements introduced in 
[FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . 

**Important** - https://github.com/apache/flink/pull/6408 has to be merged 
**before** taking out this change.

## Brief change log
All metrics are added as gauges. The following per-shard metrics are added. 
:
- sleepTimeMillis
- maxNumberOfRecordsPerFetch
- numberOfAggregatedRecordsPerFetch
- numberOfDeaggregatedRecordsPerFetch
- bytesRequestedPerFetch
- averageRecordSizeBytes
- runLoopTimeNanos
- loopFrequencyHz

## Verifying this change

This change is already covered by existing tests, such as: 
`ShardConsumerTest`, `KinesisDataFetcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6409


commit f333781a7c4f1a10b6120a962ff211e023bafaab
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

Remove unused method

commit f51703177df9afcdba3778909b1e9d8b7fa4bf46
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

commit d493097d09c6223383282ed90648853715b197ce
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T21:13:53Z

[FLINK-9899] Add more ShardConsumer metrics

Checkstyle fix




> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> --
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6409: Flink 9899.kinesis connector metrics

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6409

Flink 9899.kinesis connector metrics

## What is the purpose of the change

The purpose of this change is to add metrics to the `ShardConsumer` to get 
more observability into the performance of the Kinesis connector, including the 
enhancements introduced in 
[FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . 

**Important** - https://github.com/apache/flink/pull/6408 has to be merged 
**before** taking out this change.

## Brief change log
All metrics are added as gauges. The following per-shard metrics are added. 
:
- sleepTimeMillis
- maxNumberOfRecordsPerFetch
- numberOfAggregatedRecordsPerFetch
- numberOfDeaggregatedRecordsPerFetch
- bytesRequestedPerFetch
- averageRecordSizeBytes
- runLoopTimeNanos
- loopFrequencyHz

## Verifying this change

This change is already covered by existing tests, such as: 
`ShardConsumerTest`, `KinesisDataFetcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6409


commit f333781a7c4f1a10b6120a962ff211e023bafaab
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

Remove unused method

commit f51703177df9afcdba3778909b1e9d8b7fa4bf46
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

commit d493097d09c6223383282ed90648853715b197ce
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T21:13:53Z

[FLINK-9899] Add more ShardConsumer metrics

Checkstyle fix




---


[jira] [Updated] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to read more records in the case of long running loops

2018-07-24 Thread Lakshmi Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao updated FLINK-9897:
---
Summary: Further enhance adaptive reads in Kinesis Connector to read more 
records in the case of long running loops  (was: Further enhance adaptiveReads 
in Kinesis Connector to read more records in the case of long running loops)

> Further enhance adaptive reads in Kinesis Connector to read more records in 
> the case of long running loops
> --
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread Lakshmi Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao updated FLINK-9899:
---
Affects Version/s: 1.4.2
   1.5.1

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: vinoyang
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9897) Further enhance adaptive reads in Kinesis Connector to depend on run loop time

2018-07-24 Thread Lakshmi Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao updated FLINK-9897:
---
Summary: Further enhance adaptive reads in Kinesis Connector to depend on 
run loop time  (was: Further enhance adaptive reads in Kinesis Connector to 
read more records in the case of long running loops)

> Further enhance adaptive reads in Kinesis Connector to depend on run loop time
> --
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops

2018-07-24 Thread Lakshmi Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao updated FLINK-9897:
---
Affects Version/s: 1.4.2
   1.5.1

> Further enhance adaptiveReads in Kinesis Connector to read more records in 
> the case of long running loops
> -
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-07-24 Thread Lakshmi Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554732#comment-16554732
 ] 

Lakshmi Rao commented on FLINK-9899:


[~yanghua] Are you planning on working on this?  I have changes to add some 
metrics based off of FLINK-9897. Do you mind if I assign this Jira  to myself? 

Thanks,

Lakshmi

> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Lakshmi Rao
>Assignee: vinoyang
>Priority: Major
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops

2018-07-24 Thread Lakshmi Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lakshmi Rao reassigned FLINK-9897:
--

Assignee: Lakshmi Rao

> Further enhance adaptiveReads in Kinesis Connector to read more records in 
> the case of long running loops
> -
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops

2018-07-24 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9897:
--
Labels: pull-request-available  (was: )

> Further enhance adaptiveReads in Kinesis Connector to read more records in 
> the case of long running loops
> -
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9897) Further enhance adaptiveReads in Kinesis Connector to read more records in the case of long running loops

2018-07-24 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554727#comment-16554727
 ] 

ASF GitHub Bot commented on FLINK-9897:
---

GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6408

[FLINK-9897] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis

## What is the purpose of the change
[FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the 
feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of 
Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` 
reads/second. However, in the case that the run loop of the `ShardConsumer` 
takes more than `fetchIntervalMillis` to process records, the 
`maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change 
is to make the adaptive reads more efficient by using the actual run loop 
frequency to determine the number of reads/second and 
`maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be 
more modular.


## Brief change log

  - `processingStartTimeNanos` records start time of loop
  -  `processingEndTimeNanos` records end time of loop
  -  `adjustRunLoopFrequency()` adjusts end time depending on 
`sleepTimeMillis` (if any).
  -  `runLoopTimeNanos` records actual run loop time.
  -  `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on 
`runLoopTimeNanos`
  - Unused method `getAdaptiveMaxRecordsPerFetch` is removed.

## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`
This has also been tested against a stream with the following configuration
```
Number of Shards: 512
Parallelism: 128
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6408


commit 786556b9a9a509051a14772fbbd282db73e65252
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis




> Further enhance adaptiveReads in Kinesis Connector to read more records in 
> the case of long running loops
> -
>
> Key: FLINK-9897
> URL: https://issues.apache.org/jira/browse/FLINK-9897
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> In FLINK-9692, we introduced the ability for the shardConsumer to adaptively 
> read more records based on the current average record size to optimize the 2 
> Mb/sec shard limit. The feature maximizes  maxNumberOfRecordsPerFetch of 5 
> reads/sec (as prescribed by Kinesis limits). In the case where applications 
> take more time to process records in the run loop, they are no longer able to 
> read at a frequency of 5 reads/sec (even though their fetchIntervalMillis 
> maybe set to 200 ms). In such a scenario, the maxNumberOfRecordsPerFetch 
> should be calculated based on the time that the run loop actually takes as 
> opposed to fetchIntervalMillis. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6408: [FLINK-9897] Make adaptive reads depend on run loo...

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6408

[FLINK-9897] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis

## What is the purpose of the change
[FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the 
feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of 
Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` 
reads/second. However, in the case that the run loop of the `ShardConsumer` 
takes more than `fetchIntervalMillis` to process records, the 
`maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change 
is to make the adaptive reads more efficient by using the actual run loop 
frequency to determine the number of reads/second and 
`maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be 
more modular.


## Brief change log

  - `processingStartTimeNanos` records start time of loop
  -  `processingEndTimeNanos` records end time of loop
  -  `adjustRunLoopFrequency()` adjusts end time depending on 
`sleepTimeMillis` (if any).
  -  `runLoopTimeNanos` records actual run loop time.
  -  `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on 
`runLoopTimeNanos`
  - Unused method `getAdaptiveMaxRecordsPerFetch` is removed.

## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`
This has also been tested against a stream with the following configuration
```
Number of Shards: 512
Parallelism: 128
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6408


commit 786556b9a9a509051a14772fbbd282db73e65252
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis




---


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-07-24 Thread Juan Miguel Cejuela (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16554689#comment-16554689
 ] 

Juan Miguel Cejuela commented on FLINK-7811:


Moved to 1.7.0? So unfortunate :-(

By that point scala 2.13 will likely be out...

> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6401: [hotfix]fix typo for variable name dynamicProperties in F...

2018-07-24 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6401
  
Thanks for your contribution @rileyli. Even though this change seems rather 
trivial I would like to discourage these kind of contributions. The problem is 
that it does not add much additional value at the cost of introducing subtle 
bugs (e.g. variable name clashes). Therefore, I would like to ask you to close 
this PR.


---


[jira] [Updated] (FLINK-9939) Mesos: Not setting TMP dirs causes NPE

2018-07-24 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9939:

Fix Version/s: 1.5.2

> Mesos: Not setting TMP dirs causes NPE
> --
>
> Key: FLINK-9939
> URL: https://issues.apache.org/jira/browse/FLINK-9939
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> Relying on the default tmp dirs can cause an NPE when deploying on Mesos. 
> Find the stacktrace below.
> {noformat}
> Exception in thread "main" java.lang.NullPointerException: Value must not be 
> null.
> at 
> org.apache.flink.configuration.Configuration.setValueInternal(Configuration.java:766)
> at 
> org.apache.flink.configuration.Configuration.setString(Configuration.java:189)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapTools.updateTmpDirectoriesInConfiguration(BootstrapTools.java:490)
> at 
> org.apache.flink.mesos.entrypoint.MesosEntrypointUtils.loadConfiguration(MesosEntrypointUtils.java:176)
> at 
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:177)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9939) Mesos: Not setting TMP dirs causes NPE

2018-07-24 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-9939:

Affects Version/s: 1.5.2

> Mesos: Not setting TMP dirs causes NPE
> --
>
> Key: FLINK-9939
> URL: https://issues.apache.org/jira/browse/FLINK-9939
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> Relying on the default tmp dirs can cause an NPE when deploying on Mesos. 
> Find the stacktrace below.
> {noformat}
> Exception in thread "main" java.lang.NullPointerException: Value must not be 
> null.
> at 
> org.apache.flink.configuration.Configuration.setValueInternal(Configuration.java:766)
> at 
> org.apache.flink.configuration.Configuration.setString(Configuration.java:189)
> at 
> org.apache.flink.runtime.clusterframework.BootstrapTools.updateTmpDirectoriesInConfiguration(BootstrapTools.java:490)
> at 
> org.apache.flink.mesos.entrypoint.MesosEntrypointUtils.loadConfiguration(MesosEntrypointUtils.java:176)
> at 
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:177)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >