[jira] [Commented] (FLINK-8709) Flaky test: SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager

2018-02-20 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16371017#comment-16371017
 ] 

Bowen Li commented on FLINK-8709:
-

Another https://travis-ci.org/apache/flink/builds/344089148

> Flaky test: SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager
> --
>
> Key: FLINK-8709
> URL: https://issues.apache.org/jira/browse/FLINK-8709
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In both [https://travis-ci.org/apache/flink/jobs/343258724] and 
> [https://travis-ci.org/apache/flink/jobs/343527405] 
> {code:java}
> Running org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest
> Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.77 sec <<< 
> FAILURE! - in org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest
> testCancelSlotAllocationWithoutResourceManager(org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest)
>   Time elapsed: 0.622 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager(SlotPoolRpcTest.java:171)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-8709) Flaky test: SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8709:
-

Another instance on latest master: 
https://travis-ci.org/apache/flink/jobs/343910963

> Flaky test: SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager
> --
>
> Key: FLINK-8709
> URL: https://issues.apache.org/jira/browse/FLINK-8709
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> In both [https://travis-ci.org/apache/flink/jobs/343258724] and 
> [https://travis-ci.org/apache/flink/jobs/343527405] 
> {code:java}
> Running org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest
> Tests run: 5, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.77 sec <<< 
> FAILURE! - in org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest
> testCancelSlotAllocationWithoutResourceManager(org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest)
>   Time elapsed: 0.622 sec  <<< FAILURE!
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolRpcTest.testCancelSlotAllocationWithoutResourceManager(SlotPoolRpcTest.java:171)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4194) Implement isEndOfStream() for KinesisDeserializationSchema

2018-02-20 Thread Cristian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370997#comment-16370997
 ] 

Cristian commented on FLINK-4194:
-

Hi guys. This is a feature we miss.

Given the fact that it's not implemented and will likely not be implemented in 
the near future... how else could this be done right now?

Let me explain. Right now I need to run flink applications that read from 
Kinesis for a specific period of time (say two days), and I'm trying to figure 
out a way for the Flink app to gracefully stop itself after that. Is there a 
way to achieve that right now from within the Flink app?

My other option, which I'd like to avoid, is to periodically check for flink 
apps to kill from an external worker.

> Implement isEndOfStream() for KinesisDeserializationSchema
> --
>
> Key: FLINK-4194
> URL: https://issues.apache.org/jira/browse/FLINK-4194
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Robert Metzger
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> **Original JIRA title: KinesisDeserializationSchema.isEndOfStream() is never 
> called. The corresponding part of the code has been commented out with 
> reference to this JIRA.**
> The Kinesis connector does not respect the 
> {{KinesisDeserializationSchema.isEndOfStream()}} method.
> The purpose of this method is to stop consuming from a source, based on input 
> data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8648) Allow for customization of emitRecordAndUpdateState in Kinesis connector

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370995#comment-16370995
 ] 

ASF GitHub Bot commented on FLINK-8648:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai any update?


> Allow for customization of emitRecordAndUpdateState in Kinesis connector
> 
>
> Key: FLINK-8648
> URL: https://issues.apache.org/jira/browse/FLINK-8648
> Project: Flink
>  Issue Type: Task
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>
> It should be possible to override the method to intercept the emit behavior, 
> in this case for the purpose of custom watermark support.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-20 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai any update?


---


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-20 Thread tarun razdan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370981#comment-16370981
 ] 

tarun razdan commented on FLINK-7756:
-

[~aljoscha]

I tried building your branch and the build was successful. Now when i run the 
flink distribution on YARN, I could not find any task manager running. I tried 
to run with debug log on, but there were no errors.

Can you suggest some ways to debug this?

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> 

[jira] [Commented] (FLINK-8700) Port tests from FlinkMiniCluster to MiniClusterResource

2018-02-20 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370946#comment-16370946
 ] 

mingleizhang commented on FLINK-8700:
-

Hi, [~aljoscha] I think we CAN not remove {{startCluster}} in {{TestBaseUtils}} 
since it belongs to an old cluster which will start if {{MiniClusterType}} is 
set to {{OLD}} pattern.

> Port tests from FlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8700
> URL: https://issues.apache.org/jira/browse/FLINK-8700
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Quite some ITCases rely on {{FlinkMiniCluster}} and its subclasses 
> ({{LocalFlinkMiniCluster}} and {{TestingCluster}}). This means they use the 
> legacy {{JobManager}} and {{TaskManager}} and not the new FLIP-6 components 
> which are enabled by default in Flink 1.5.0.
> {{AbstractTestBase}} uses the new {{MiniClusterResource}} which encapsulates 
> creation of a FLIP-6 cluster or legacy cluster. We should use this in all 
> ITCases, which probably means that we have to extend it a bit, for example to 
> allow access to a {{ClusterClient}}.
> Transitively, {{TestBaseUtils.startCluster()}} also uses the legacy 
> {{LocalFlinkMiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370944#comment-16370944
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
cc @pnowojski  @aljoscha 


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8710) AbstractYarnClusterDescriptor doesn't use pre-defined configs in Hadoop's YarnConfiguration

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370943#comment-16370943
 ] 

ASF GitHub Bot commented on FLINK-8710:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5522
  
cc @tillrohrmann 


> AbstractYarnClusterDescriptor doesn't use pre-defined configs in Hadoop's 
> YarnConfiguration
> ---
>
> Key: FLINK-8710
> URL: https://issues.apache.org/jira/browse/FLINK-8710
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> {{AbstractYarnClusterDescriptor}} should use Hadoop's 
> {{YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB}} rather than raw 
> string "yarn.scheduler.minimum-allocation-mb"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370942#comment-16370942
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @pnowojski @aljoscha 


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
cc @pnowojski  @aljoscha 


---


[GitHub] flink issue #5522: [FLINK-8710] [YARN] AbstractYarnClusterDescriptor doesn't...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5522
  
cc @tillrohrmann 


---


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-20 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @pnowojski @aljoscha 


---


[jira] [Commented] (FLINK-7386) Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ client

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7386?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370724#comment-16370724
 ] 

ASF GitHub Bot commented on FLINK-7386:
---

Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169490640
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

hmmm  @zentol  I thought I was commenting my PR #5374 because it is based 
on this one... But I guess your comment will apply to my PR as well.


> Flink Elasticsearch 5 connector is not compatible with Elasticsearch 5.2+ 
> client
> 
>
> Key: FLINK-7386
> URL: https://issues.apache.org/jira/browse/FLINK-7386
> Project: Flink
>  Issue Type: Improvement
>  Components: ElasticSearch Connector
>Reporter: Dawid Wysakowicz
>Assignee: Fang Yong
>Priority: Critical
> Fix For: 1.5.0
>
>
> In Elasticsearch 5.2.0 client the class {{BulkProcessor}} was refactored and 
> has no longer the method {{add(ActionRequest)}}.
> For more info see: https://github.com/elastic/elasticsearch/pull/20109



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4675: [FLINK-7386] FIx Elasticsearch 5 connector is not ...

2018-02-20 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/4675#discussion_r169490640
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch5.3/src/main/java/org/apache/flink/streaming/connectors/elasticsearch53/BulkProcessorIndexer.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch53;
+
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of a {@link RequestIndexer}, using a {@link 
BulkProcessor}.
+ * {@link ActionRequest ActionRequests} will be converted to {@link 
DocWriteRequest}
+ * and will be buffered before sending a bulk request to the Elasticsearch 
cluster.
+ */
+public class BulkProcessorIndexer implements RequestIndexer {
+
+   private final BulkProcessor bulkProcessor;
+   private final boolean flushOnCheckpoint;
+   private final AtomicLong numPendingRequestsRef;
+
+   public BulkProcessorIndexer(BulkProcessor bulkProcessor,
+   boolean 
flushOnCheckpoint,
+   AtomicLong 
numPendingRequests) {
+   this.bulkProcessor = bulkProcessor;
+   this.flushOnCheckpoint = flushOnCheckpoint;
+   this.numPendingRequestsRef = numPendingRequests;
+   }
+
+   @Override
+   public void add(ActionRequest... actionRequests) {
+   for (ActionRequest actionRequest : actionRequests) {
+   if (flushOnCheckpoint) {
+   numPendingRequestsRef.getAndIncrement();
+   }
+   this.bulkProcessor.add((DocWriteRequest) actionRequest);
--- End diff --

hmmm  @zentol  I thought I was commenting my PR #5374 because it is based 
on this one... But I guess your comment will apply to my PR as well.


---


[jira] [Updated] (FLINK-8720) Logging exception with S3 connector and BucketingSink

2018-02-20 Thread dejan miljkovic (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dejan miljkovic updated FLINK-8720:
---
Summary: Logging exception with S3 connector and BucketingSink  (was: 
Logging exception )

> Logging exception with S3 connector and BucketingSink
> -
>
> Key: FLINK-8720
> URL: https://issues.apache.org/jira/browse/FLINK-8720
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.1
>Reporter: dejan miljkovic
>Priority: Critical
>
> Trying to stream data to S3. Code works from InteliJ. When submitting code 
> trough UI on my machine (single node cluster started by start-cluster.sh 
> script) below stack trace is produced.
>  
> Below is the link to the simple test app that is streaming data to S3. 
> [https://github.com/dmiljkovic/test-flink-bucketingsink-s3]
> The behavior is bit different but same error is produced.  Job works only 
> once. If job is submitted second time below stack trace is produced. If I 
> restart the cluster job works but only for the first time.
>  
>  
> org.apache.commons.logging.LogConfigurationException: 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3 (Caused by 
> java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
>   at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88)
>   at 
> org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96)
>   at 
> com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
>   at 
> com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
>   at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158)
>   at 
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalAccessError: 
> org/apache/commons/logging/impl/LogFactoryImpl$3
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
>   at 
> org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
>   ... 26 more
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8720) Logging exception

2018-02-20 Thread dejan miljkovic (JIRA)
dejan miljkovic created FLINK-8720:
--

 Summary: Logging exception 
 Key: FLINK-8720
 URL: https://issues.apache.org/jira/browse/FLINK-8720
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.4.1
Reporter: dejan miljkovic


Trying to stream data to S3. Code works from InteliJ. When submitting code 
trough UI on my machine (single node cluster started by start-cluster.sh 
script) below stack trace is produced.

 

Below is the link to the simple test app that is streaming data to S3. 
[https://github.com/dmiljkovic/test-flink-bucketingsink-s3]

The behavior is bit different but same error is produced.  Job works only once. 
If job is submitted second time below stack trace is produced. If I restart the 
cluster job works but only for the first time.

 

 
org.apache.commons.logging.LogConfigurationException: 
java.lang.IllegalAccessError: org/apache/commons/logging/impl/LogFactoryImpl$3 
(Caused by java.lang.IllegalAccessError: 
org/apache/commons/logging/impl/LogFactoryImpl$3)
at 
org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
at 
org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
at 
org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96)
at 
com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
at 
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158)
at 
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119)
at 
com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389)
at 
com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: 
org/apache/commons/logging/impl/LogFactoryImpl$3
at 
org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
at 
org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
at 
org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
at 
org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
... 26 more
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370581#comment-16370581
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

GitHub user Bekreth opened a pull request:

https://github.com/apache/flink/pull/5538

[FLINK-8655] [DataSink] Added default keyspace to CassandraPojoSink

## What is the purpose of the change

Initial Issue : [https://issues.apache.org/jira/browse/FLINK-8655]
It is not uncommon for users to have a single Cassandra instance for all of 
their test environments, differentiating what each environment uses based on 
keyspace.  This PR adds functionality for the CassandraPojoSink to have the 
keyspace defined at runtime to allow for more flexible configurations.

## Brief change log

  - Added functionality for defining default keyspace at runtime.

## Verifying this change

The changes have been verified manually.  Unit tests will be included 
following preliminary discussion on the functionality.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Bekreth/flink cassandraDefaultKeyspaceSink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5538


commit bbf52274ba46da005894c1f7c4be4860cd619280
Author: chugh13 
Date:   2018-02-20T20:02:11Z

Added method to specify the keyspace that POJOs should be written to by 
default.

commit 0e014aefbb7cccf0e1d8caed50f2a9dc88962d4f
Author: chugh13 
Date:   2018-02-20T20:37:17Z

Added Javadocs and log messages to the defaultKeyspace feature




> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.5.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5538: [FLINK-8655] [DataSink] Added default keyspace to ...

2018-02-20 Thread Bekreth
GitHub user Bekreth opened a pull request:

https://github.com/apache/flink/pull/5538

[FLINK-8655] [DataSink] Added default keyspace to CassandraPojoSink

## What is the purpose of the change

Initial Issue : [https://issues.apache.org/jira/browse/FLINK-8655]
It is not uncommon for users to have a single Cassandra instance for all of 
their test environments, differentiating what each environment uses based on 
keyspace.  This PR adds functionality for the CassandraPojoSink to have the 
keyspace defined at runtime to allow for more flexible configurations.

## Brief change log

  - Added functionality for defining default keyspace at runtime.

## Verifying this change

The changes have been verified manually.  Unit tests will be included 
following preliminary discussion on the functionality.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Bekreth/flink cassandraDefaultKeyspaceSink

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5538.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5538


commit bbf52274ba46da005894c1f7c4be4860cd619280
Author: chugh13 
Date:   2018-02-20T20:02:11Z

Added method to specify the keyspace that POJOs should be written to by 
default.

commit 0e014aefbb7cccf0e1d8caed50f2a9dc88962d4f
Author: chugh13 
Date:   2018-02-20T20:37:17Z

Added Javadocs and log messages to the defaultKeyspace feature




---


[jira] [Commented] (FLINK-8660) Enable the user to provide custom HAServices implementation

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370487#comment-16370487
 ] 

ASF GitHub Bot commented on FLINK-8660:
---

Github user kbialek commented on the issue:

https://github.com/apache/flink/pull/5530
  
Closing without merge, because of:
1. Test failure
2. I'd like to implement Stephan's idea


> Enable the user to provide custom HAServices implementation 
> 
>
> Key: FLINK-8660
> URL: https://issues.apache.org/jira/browse/FLINK-8660
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration, Distributed 
> Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Krzysztof Białek
>Priority: Major
> Fix For: 1.6.0
>
>
> At the moment Flink uses ZooKeeper as HA backend.
> The goal of this improvement is to make Flink supporting more HA backends, 
> also maintained as independent projects.
> The following changes are required to achieve it:
>  # Add {{HighAvailabilityServicesFactory}} interface
>  # Add new option {{HighAvailabilityMode.CUSTOM}}
>  # Add new configuration property {{high-availability.factoryClass}}
>  # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate  
> {{HighAvailabilityServices}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8660) Enable the user to provide custom HAServices implementation

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370489#comment-16370489
 ] 

ASF GitHub Bot commented on FLINK-8660:
---

Github user kbialek closed the pull request at:

https://github.com/apache/flink/pull/5530


> Enable the user to provide custom HAServices implementation 
> 
>
> Key: FLINK-8660
> URL: https://issues.apache.org/jira/browse/FLINK-8660
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration, Distributed 
> Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Krzysztof Białek
>Priority: Major
> Fix For: 1.6.0
>
>
> At the moment Flink uses ZooKeeper as HA backend.
> The goal of this improvement is to make Flink supporting more HA backends, 
> also maintained as independent projects.
> The following changes are required to achieve it:
>  # Add {{HighAvailabilityServicesFactory}} interface
>  # Add new option {{HighAvailabilityMode.CUSTOM}}
>  # Add new configuration property {{high-availability.factoryClass}}
>  # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate  
> {{HighAvailabilityServices}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5530: [FLINK-8660] Enable the user to provide custom HAS...

2018-02-20 Thread kbialek
Github user kbialek closed the pull request at:

https://github.com/apache/flink/pull/5530


---


[GitHub] flink issue #5530: [FLINK-8660] Enable the user to provide custom HAServices...

2018-02-20 Thread kbialek
Github user kbialek commented on the issue:

https://github.com/apache/flink/pull/5530
  
Closing without merge, because of:
1. Test failure
2. I'd like to implement Stephan's idea


---


[jira] [Comment Edited] (FLINK-8660) Enable the user to provide custom HAServices implementation

2018-02-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370484#comment-16370484
 ] 

Krzysztof Białek edited comment on FLINK-8660 at 2/20/18 7:17 PM:
--

[~StephanEwen] suggested to simplify configuration 
[https://github.com/apache/flink/pull/5530#issuecomment-366921300]
{quote}I would suggest to not split the config options between 
{{high-availability}} and factory that is only used in _CUSTOM_ mode, but use 
the {{high-availability}} option for both. The option describes the high 
availability services factory, with _NONE_ and _ZOOKEEPER_ as special 
constants/shortcuts.

Have a look at the {{StateBackendLoader}} class for an example of that.
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L92
]
{quote}
Thank you for this hint, I like this idea


was (Author: kbialek):
[~StephanEwen] suggested to simplify configuration 
[https://github.com/apache/flink/pull/5530#issuecomment-366921300]
{quote}I would suggest to not split the config options between 
{{high-availability}} and factory that is only used in _CUSTOM_ mode, but use 
the {{high-availability}} option for both. The option describes the high 
availability services factory, with _NONE_ and _ZOOKEEPER_ as special 
constants/shortcuts.

Have a look at the {{StateBackendLoader}} class for an example of that.
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L92]

 
{quote}
 

> Enable the user to provide custom HAServices implementation 
> 
>
> Key: FLINK-8660
> URL: https://issues.apache.org/jira/browse/FLINK-8660
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration, Distributed 
> Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Krzysztof Białek
>Priority: Major
> Fix For: 1.6.0
>
>
> At the moment Flink uses ZooKeeper as HA backend.
> The goal of this improvement is to make Flink supporting more HA backends, 
> also maintained as independent projects.
> The following changes are required to achieve it:
>  # Add {{HighAvailabilityServicesFactory}} interface
>  # Add new option {{HighAvailabilityMode.CUSTOM}}
>  # Add new configuration property {{high-availability.factoryClass}}
>  # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate  
> {{HighAvailabilityServices}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8660) Enable the user to provide custom HAServices implementation

2018-02-20 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-8660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370484#comment-16370484
 ] 

Krzysztof Białek commented on FLINK-8660:
-

[~StephanEwen] suggested to simplify configuration 
[https://github.com/apache/flink/pull/5530#issuecomment-366921300]
{quote}I would suggest to not split the config options between 
{{high-availability}} and factory that is only used in _CUSTOM_ mode, but use 
the {{high-availability}} option for both. The option describes the high 
availability services factory, with _NONE_ and _ZOOKEEPER_ as special 
constants/shortcuts.

Have a look at the {{StateBackendLoader}} class for an example of that.
[https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendLoader.java#L92]

 
{quote}
 

> Enable the user to provide custom HAServices implementation 
> 
>
> Key: FLINK-8660
> URL: https://issues.apache.org/jira/browse/FLINK-8660
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management, Configuration, Distributed 
> Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Krzysztof Białek
>Priority: Major
> Fix For: 1.6.0
>
>
> At the moment Flink uses ZooKeeper as HA backend.
> The goal of this improvement is to make Flink supporting more HA backends, 
> also maintained as independent projects.
> The following changes are required to achieve it:
>  # Add {{HighAvailabilityServicesFactory}} interface
>  # Add new option {{HighAvailabilityMode.CUSTOM}}
>  # Add new configuration property {{high-availability.factoryClass}}
>  # Use the factory in {{HighAvailabilityServicesUtils}} to instantiate  
> {{HighAvailabilityServices}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8719) add module description for flink-contrib to clarify its purpose

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370479#comment-16370479
 ] 

ASF GitHub Bot commented on FLINK-8719:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5537

[FLINK-8719] add module description for flink-contrib to clarify its purpose

## What is the purpose of the change

flink-contrib currently doesn't have any clarification or description of 
its purpose, which confuses lots of developers. Adding clarification and module 
description

## Brief change log

Adding clarification and module description which I borrowed from the PR 
description of https://github.com/apache/flink/pull/5523

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8719

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5537


commit aecfcdd6e776f2b885f4cb5288bcb1b27d4b23cd
Author: Bowen Li 
Date:   2018-02-20T19:04:43Z

[FLINK-8719] add module description for flink-contrib to clarify its purpose




> add module description for flink-contrib to clarify its purpose
> ---
>
> Key: FLINK-8719
> URL: https://issues.apache.org/jira/browse/FLINK-8719
> Project: Flink
>  Issue Type: Improvement
>  Components: flink-contrib
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.5.0
>
>
> {\{flink-contrib}} currently doesn't have any clarification or description of 
> its purpose, which confuses lots of developers. Adding clarification and 
> module description



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5537: [FLINK-8719] add module description for flink-cont...

2018-02-20 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5537

[FLINK-8719] add module description for flink-contrib to clarify its purpose

## What is the purpose of the change

flink-contrib currently doesn't have any clarification or description of 
its purpose, which confuses lots of developers. Adding clarification and module 
description

## Brief change log

Adding clarification and module description which I borrowed from the PR 
description of https://github.com/apache/flink/pull/5523

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8719

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5537.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5537


commit aecfcdd6e776f2b885f4cb5288bcb1b27d4b23cd
Author: Bowen Li 
Date:   2018-02-20T19:04:43Z

[FLINK-8719] add module description for flink-contrib to clarify its purpose




---


[jira] [Created] (FLINK-8719) add module description for flink-contrib to clarify its purpose

2018-02-20 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8719:
---

 Summary: add module description for flink-contrib to clarify its 
purpose
 Key: FLINK-8719
 URL: https://issues.apache.org/jira/browse/FLINK-8719
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.5.0


{\{flink-contrib}} currently doesn't have any clarification or description of 
its purpose, which confuses lots of developers. Adding clarification and module 
description



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the FileInputFormat for bad files

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370347#comment-16370347
 ] 

ASF GitHub Bot commented on FLINK-8599:
---

Github user ChengzhiZhao commented on the issue:

https://github.com/apache/flink/pull/5521
  
Thanks @steveloughran for your feedbacks, I updated based on your 
suggestions.


> Improve the failure behavior of the FileInputFormat for bad files
> -
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5521: [FLINK-8599] Improve the failure behavior of the FileInpu...

2018-02-20 Thread ChengzhiZhao
Github user ChengzhiZhao commented on the issue:

https://github.com/apache/flink/pull/5521
  
Thanks @steveloughran for your feedbacks, I updated based on your 
suggestions.


---


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-02-20 Thread Alexander Gardner (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370315#comment-16370315
 ] 

Alexander Gardner commented on FLINK-8707:
--

Also, if you start to run a Flink job on the cluster e.g. FlinkKafkaConsumer 
source and dummy sink that simply reads the msgs and doesn't do anything, the 
number of FDs increases by 1000s again. 

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, 
> box2-taskmgr-lsof
>
>
> ** NOTE ** - THE COMPONENT IS TASK MANAGER NOT JOB MANAGER 
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8718) Non-parallel DataStreamSource does not set max parallelism

2018-02-20 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8718:
---

 Summary: Non-parallel DataStreamSource does not set max parallelism
 Key: FLINK-8718
 URL: https://issues.apache.org/jira/browse/FLINK-8718
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.5.0
Reporter: Gary Yao
 Fix For: 1.5.0


{{org.apache.flink.streaming.api.datastream.DataStreamSource}} does not set 
{{maxParallelism}} to 1 if it is non-parallel.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8717) Flink seems to deadlock due to buffer starvation when iterating

2018-02-20 Thread Ken Krugler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370301#comment-16370301
 ] 

Ken Krugler commented on FLINK-8717:


Hi [~rrevol] - I believe what you're seeing is caused by circular network 
buffer deadlock due to fan-out, not starvation. See [Piotr's response to my 
email|http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CBFD8C506-5B41-47D8-B735-488D03842051%40data-artisans.com%3E].
 If this matches what you're seeing, then please close this issue as "not a 
bug". BTW, I'll be talking about this issue at Flink Forward during my session 
on creating a web crawler with Flink.

> Flink seems to deadlock due to buffer starvation when iterating
> ---
>
> Key: FLINK-8717
> URL: https://issues.apache.org/jira/browse/FLINK-8717
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
> Environment: Windows 10 Pro 64-bit
> Core i7-6820HQ @ 2.7 GHz
> 16GB RAM
> Flink 1.4
> Scala client
> Scala 2.11.7
>  
>Reporter: Romain Revol
>Priority: Major
> Attachments: threadDump.txt
>
>
> We are encountering what looks like a deadlock of Flink in one of our jobs 
> with an "iterate" in it.
> I've reduced the job use case to the example in this gist : 
> [https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]
> Nothe that :
>  * varying the parallelism affects the rapidity of occurence of the deadlock, 
> but it always occur
>  * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the 
> faster we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It 
> consequently leads to think that it happens when the number of iterations 
> reaches some threshold.
> From the [^threadDump.txt], it looks like some starvation over buffer 
> allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
> since I don't know well Flink internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8656) Add CLI command for rescaling

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370246#comment-16370246
 ] 

ASF GitHub Bot commented on FLINK-8656:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5487
  
After rescaling `SocketWindowWordCount`, the parallelism exposed by the 
REST API remains unchanged.

Steps to reproduce:

Start netcat.
```
nc -l 9001
```
Submit job with parallelism 1.
```
bin/flink run -p1 examples/streaming/SocketWindowWordCount.jar --port 9001
```

Re-scale to parallelism 10.
```
bin/flink modify 4f9e368f973be3780b75e49e50168fcc -p 10
```

Check parallelism:
```
curl localhost:9065/jobs/4f9e368f973be3780b75e49e50168fcc/config | jq .
{
  "jid": "4f9e368f973be3780b75e49e50168fcc",
  "name": "Socket Window WordCount",
  "execution-config": {
"execution-mode": "PIPELINED",
"restart-strategy": "default",
"job-parallelism": 1,
"object-reuse-mode": false,
"user-config": {}
  }
}
```


> Add CLI command for rescaling
> -
>
> Key: FLINK-8656
> URL: https://issues.apache.org/jira/browse/FLINK-8656
> Project: Flink
>  Issue Type: New Feature
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The REST rescaling calls should be made accessible via the {{CliFrontend}}. 
> In order to do that I propose to add a {{modify}} command to the 
> {{CliFrontend}} to which we can pass a new parallelism.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5487: [FLINK-8656] [flip6] Add modify CLI command to rescale Fl...

2018-02-20 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5487
  
After rescaling `SocketWindowWordCount`, the parallelism exposed by the 
REST API remains unchanged.

Steps to reproduce:

Start netcat.
```
nc -l 9001
```
Submit job with parallelism 1.
```
bin/flink run -p1 examples/streaming/SocketWindowWordCount.jar --port 9001
```

Re-scale to parallelism 10.
```
bin/flink modify 4f9e368f973be3780b75e49e50168fcc -p 10
```

Check parallelism:
```
curl localhost:9065/jobs/4f9e368f973be3780b75e49e50168fcc/config | jq .
{
  "jid": "4f9e368f973be3780b75e49e50168fcc",
  "name": "Socket Window WordCount",
  "execution-config": {
"execution-mode": "PIPELINED",
"restart-strategy": "default",
"job-parallelism": 1,
"object-reuse-mode": false,
"user-config": {}
  }
}
```


---


[jira] [Assigned] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8704:
---

Assignee: (was: Chesnay Schepler)

> Migrate tests from TestingCluster to MiniClusterResource
> 
>
> Key: FLINK-8704
> URL: https://issues.apache.org/jira/browse/FLINK-8704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8704) Migrate tests from TestingCluster to MiniClusterResource

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-8704:
---

Assignee: Chesnay Schepler

> Migrate tests from TestingCluster to MiniClusterResource
> 
>
> Key: FLINK-8704
> URL: https://issues.apache.org/jira/browse/FLINK-8704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-02-20 Thread Alexander Gardner (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexander Gardner updated FLINK-8707:
-
Attachment: box2-jobmgr-lsof
box1-taskmgr-lsof
box1-jobmgr-lsof
box2-taskmgr-lsof

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, 
> box2-taskmgr-lsof
>
>
> ** NOTE ** - THE COMPONENT IS TASK MANAGER NOT JOB MANAGER 
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-02-20 Thread Alexander Gardner (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370223#comment-16370223
 ] 

Alexander Gardner commented on FLINK-8707:
--

Attaching the lsof output, removing any refs to real boxes or locations.

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, 
> box2-taskmgr-lsof
>
>
> ** NOTE ** - THE COMPONENT IS TASK MANAGER NOT JOB MANAGER 
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 6633 dfdev 388r FIFO 0,8 0t0 459758080 pipe
>  java 12154 6633 dfdev 389w FIFO 0,8 0t0 459758080 pipe
> Lasty, cannot identify yet the reason for the creep in FDs even if Flink is 
> pretty dormant or has dormant jobs. Production nodes are not experiencing 
> excessive amounts of throughput yet either.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-02-20 Thread Alexander Gardner (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370219#comment-16370219
 ] 

Alexander Gardner commented on FLINK-8707:
--

Hi Aljoscha

I'm adding some more info from our DEV box - could this just be an incorect 
interpretation of the LINUX "lsof" command or are there really that many file 
descriptors open?
Bear in mind that no flink jobs are running, this is merely just starting up 
the flink processes.

Note, history servers were not running on either boxes.

Commands used to start the flink cluster on (both) boxes:

/opt/app/x/dev/pkgs/flink/current/bin/jobmanager.sh start cluster
/opt/app/x/dev/pkgs/flink/current/bin/taskmanager.sh start

BEFORE START OF FLINK CLUSTER
Amount of FDs in total for the flink user (before) Flink Job Manager & Task 
Manager starts
Box 1: 
$ lsof | wc -l
2363

Box 2: 
$ lsof | wc -l
5309

AFTER START OF FLINK JOB MANAGER CLUSTER
Amount of FDs in total for the flink user (before) Flink Job Manager

Box 1
--
After starting JOBMANAGER
$ lsof | wc -l
14405

Box 2: After starting TASKMANAGER
$ lsof | wc -l
27072

Job Manager process only (pid = 30064)
lsof | grep 30064 | wc -l
12042

Task Manager process only (pid = 30984)
lsof | grep 30984 | wc -l
12428


BOX 2
--
After starting JOBMANAGER
$ lsof | wc -l
17425

Box 2: After starting TASKMANAGER
$ lsof | wc -l
32552

Job Manager process only (pid = 27217)
lsof | grep 27217 | wc -l
17208

Task Manager process only (pid = 30606)
lsof | grep 30606 | wc -l
14421


In some cases a socket can have two FDs and two NIO Selectors can access the 
same SocketChannel for say read and writes but just really just trying to 
understand why so many file descriptors.
More than one child process of the TaskManager is using sockets.

This maybe just be normal Flink behaviour but need a Flink committer / expert 
to just state this unless there's a known feature? If so, we need to alter our 
Production monitoring systems if this is normal / safe. 
If advised, we can reboot our Flink cluster if we're approaching the ulimit.

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
>
> ** NOTE ** - THE COMPONENT IS TASK MANAGER NOT JOB MANAGER 
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of 

[jira] [Commented] (FLINK-8639) Fix always need to seek multiple times when iterator RocksDBMapState

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370218#comment-16370218
 ] 

ASF GitHub Bot commented on FLINK-8639:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5465#discussion_r169374194
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -400,7 +410,7 @@ public UV setValue(UV value) {
/** An auxiliary utility to scan all entries under the given key. */
private abstract class RocksDBMapIterator implements Iterator {
 
-   static final int CACHE_SIZE_BASE = 1;
+   static final int CACHE_SIZE_BASE = 32;
--- End diff --

Indeed, I don't think the memory is really a concern (at least when 
CACHE_SIZE_LIMIT=128), I almost like to change the CACHE_SIZE to a fixed value 
(like 128), What do you think?


> Fix always need to seek multiple times when iterator RocksDBMapState
> 
>
> Key: FLINK-8639
> URL: https://issues.apache.org/jira/browse/FLINK-8639
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.5.0
>
>
> Currently, almost every time we want to iterator a RocksDBMapState we need to 
> do seek at least 2 times (Seek is a poor performance action for rocksdb cause 
> it can't use the bloomfilter). This is because `RocksDBMapIterator` use a 
> `cacheEntries` to cache the seek values every time and the `cacheEntries`'s 
> init size is 1.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5465: [FLINK-8639][State Backends]Fix always need to see...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5465#discussion_r169374194
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
 ---
@@ -400,7 +410,7 @@ public UV setValue(UV value) {
/** An auxiliary utility to scan all entries under the given key. */
private abstract class RocksDBMapIterator implements Iterator {
 
-   static final int CACHE_SIZE_BASE = 1;
+   static final int CACHE_SIZE_BASE = 32;
--- End diff --

Indeed, I don't think the memory is really a concern (at least when 
CACHE_SIZE_LIMIT=128), I almost like to change the CACHE_SIZE to a fixed value 
(like 128), What do you think?


---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370209#comment-16370209
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169371872
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Oh, yes sorry you are right, I was confused :-)


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169371872
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Oh, yes sorry you are right, I was confused :-)


---


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370205#comment-16370205
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169370975
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 ---
@@ -134,7 +106,8 @@ public void testAccumulatorsAfterNoOp() {
final String accName = "test_accumulator";
 
try {
-   env.setParallelism(6);
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
--- End diff --

yeah, that's probably the better option.


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370206#comment-16370206
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r16937
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Aha, sorry that I am a bit confuse about "`ByteArrayInputStreamWithPos` can 
also grow internally?" Do you mean `ByteArrayOutputStreamWithPos`?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r16937
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
--- End diff --

Aha, sorry that I am a bit confuse about "`ByteArrayInputStreamWithPos` can 
also grow internally?" Do you mean `ByteArrayOutputStreamWithPos`?


---


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169370975
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 ---
@@ -134,7 +106,8 @@ public void testAccumulatorsAfterNoOp() {
final String accName = "test_accumulator";
 
try {
-   env.setParallelism(6);
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
--- End diff --

yeah, that's probably the better option.


---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370195#comment-16370195
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369280
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   if 
(dataInput.available() == namespaceBytesLength) {
--- End diff --

You are right! And I am almost like to remove it now ...


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370196#comment-16370196
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169369295
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 ---
@@ -134,7 +106,8 @@ public void testAccumulatorsAfterNoOp() {
final String accName = "test_accumulator";
 
try {
-   env.setParallelism(6);
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
--- End diff --

Why not start a mini cluster with 2 TMs and 3 slots as before?


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370197#comment-16370197
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369352
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -211,24 +211,55 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
 
@Test
public void testGetKeys() throws Exception {
-   final int elementsToTest = 1000;
+   final int namespace1ElementsNum = 1000;
+   final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
try {
-   ValueState keyedState = 
backend.getOrCreateKeyedState(
-   VoidNamespaceSerializer.INSTANCE,
-   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
-   ((InternalValueState) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+   final String ns1 = "ns1";
+   ValueState keyedState1 = 
backend.getPartitionedState(
+   ns1,
+   StringSerializer.INSTANCE,
+   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE)
+   );
+
+   ((InternalValueState) 
keyedState1).setCurrentNamespace(ns1);
+
+   for (int key = 0; key < namespace1ElementsNum; key++) {
+   backend.setCurrentKey(key);
+   keyedState1.update(key * 2);
+   }
+
+   ValueState keyedState2 = 
backend.getPartitionedState(
+   ns1,
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369352
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -211,24 +211,55 @@ protected CheckpointStreamFactory 
createStreamFactory() throws Exception {
 
@Test
public void testGetKeys() throws Exception {
-   final int elementsToTest = 1000;
+   final int namespace1ElementsNum = 1000;
+   final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend backend = 
createKeyedBackend(IntSerializer.INSTANCE);
try {
-   ValueState keyedState = 
backend.getOrCreateKeyedState(
-   VoidNamespaceSerializer.INSTANCE,
-   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE));
-   ((InternalValueState) 
keyedState).setCurrentNamespace(VoidNamespace.INSTANCE);
+   final String ns1 = "ns1";
+   ValueState keyedState1 = 
backend.getPartitionedState(
+   ns1,
+   StringSerializer.INSTANCE,
+   new ValueStateDescriptor<>(fieldName, 
IntSerializer.INSTANCE)
+   );
+
+   ((InternalValueState) 
keyedState1).setCurrentNamespace(ns1);
+
+   for (int key = 0; key < namespace1ElementsNum; key++) {
+   backend.setCurrentKey(key);
+   keyedState1.update(key * 2);
+   }
+
+   ValueState keyedState2 = 
backend.getPartitionedState(
+   ns1,
--- End diff --

addressed.


---


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169369280
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
+   for (int i = 1; i <= 
namespaceBytesLength; ++i) {
+   if (key[key.length - i] 
!= namespaceBytes[namespaceBytesLength - i]) {
+   namespaceValid 
= false;
+   break;
+   }
+   }
+   if (namespaceValid) {
+   
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
+   new 
ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - 
keyGroupPrefixBytes));
+   K value = 
keySerializer.deserialize(dataInput);
+   if 
(dataInput.available() == namespaceBytesLength) {
--- End diff --

You are right! And I am almost like to remove it now ...


---


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169369295
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java
 ---
@@ -134,7 +106,8 @@ public void testAccumulatorsAfterNoOp() {
final String accName = "test_accumulator";
 
try {
-   env.setParallelism(6);
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(4);
--- End diff --

Why not start a mini cluster with 2 TMs and 3 slots as before?


---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370192#comment-16370192
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368766
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370191#comment-16370191
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368679
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
--- End diff --

Added a unit test `RocksDBRocksIteratorWrapperTest`.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370193#comment-16370193
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368813
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368766
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


---


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368813
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
+   while (nextKey == null && iterator.isValid()) {
+   try {
+   boolean namespaceValid = true;
+   byte[] key = iterator.key();
+   if (key.length >= namespaceBytesLength 
+ keyGroupPrefixBytes) {
--- End diff --

addressed.


---


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368679
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
private final RocksIterator iterator;
private final String state;
private final TypeSerializer keySerializer;
private final int keyGroupPrefixBytes;
+   private final byte[] namespaceBytes;
+   private K nextKey;
 
public RocksIteratorWrapper(
RocksIterator iterator,
String state,
TypeSerializer keySerializer,
-   int keyGroupPrefixBytes) {
+   int keyGroupPrefixBytes,
+   byte[] namespaceBytes) {
this.iterator = Preconditions.checkNotNull(iterator);
this.state = Preconditions.checkNotNull(state);
this.keySerializer = 
Preconditions.checkNotNull(keySerializer);
this.keyGroupPrefixBytes = 
Preconditions.checkNotNull(keyGroupPrefixBytes);
+   this.namespaceBytes = 
Preconditions.checkNotNull(namespaceBytes);
+   this.nextKey = null;
}
 
@Override
public boolean hasNext() {
-   return iterator.isValid();
+   final int namespaceBytesLength = namespaceBytes.length;
--- End diff --

Added a unit test `RocksDBRocksIteratorWrapperTest`.


---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370190#comment-16370190
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368440
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
--- End diff --

Good point, addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370189#comment-16370189
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368355
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
+   namespaceSerializer.serialize(namespace, new 
DataOutputViewStreamWrapper(outputStream));
+   final byte[] namespaceBytes = 
outputStream.toByteArray();
+   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, 
namespaceBytes);
+   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
+   return targetStream.onClose(iterator::close);
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
--- End diff --

Nice catch! Addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368440
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -1991,43 +1999,71 @@ public int numStateEntries() {
return count;
}
 
+   /**
+* This class is not thread safety.
+*/
private static class RocksIteratorWrapper implements Iterator {
--- End diff --

Good point, addressed.


---


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169368355
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
+   namespaceSerializer.serialize(namespace, new 
DataOutputViewStreamWrapper(outputStream));
+   final byte[] namespaceBytes = 
outputStream.toByteArray();
+   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes, 
namespaceBytes);
+   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
+   return targetStream.onClose(iterator::close);
+   } catch (IOException ex) {
+   throw new FlinkRuntimeException("Failed to get keys 
from RocksDB state backend.", ex);
--- End diff --

Nice catch! Addressed.


---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370184#comment-16370184
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367934
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370183#comment-16370183
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -62,7 +62,7 @@
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

addressed.


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367934
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -266,9 +267,16 @@ public RocksDBKeyedStateBackend(
RocksIterator iterator = db.newIterator(columnInfo.f0);
iterator.seekToFirst();
 
-   Iterable iterable = () -> new 
RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-   Stream targetStream = 
StreamSupport.stream(iterable.spliterator(), false);
-   return targetStream.onClose(iterator::close);
+   try {
+   ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(8);
--- End diff --

addressed.


---


[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5518#discussion_r169367889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -62,7 +62,7 @@
 * @param state State variable for which existing keys will be returned.
 * @param namespace Namespace for which existing keys will be returned.
 */
-Stream getKeys(String state, N namespace);
+Stream getKeys(String state, N namespace, TypeSerializer 
namespaceSerializer);
--- End diff --

addressed.


---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370167#comment-16370167
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

[~snowch] Do you see these failures for all files or only sporadically? Also, 
do you maybe have updated logs with stack traces that show who calls 
{{close()}} and {{flush()}}?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload 

[jira] [Reopened] (FLINK-8574) Add timestamps to travis logging messages

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-8574:
-

I'll also merge this to 1.4

> Add timestamps to travis logging messages
> -
>
> Key: FLINK-8574
> URL: https://issues.apache.org/jira/browse/FLINK-8574
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> To better understand the impact of plugins it would be very helpful to have 
> timestamps prepended to each logging statement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8574) Add timestamps to travis logging messages

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8574.
---
   Resolution: Fixed
Fix Version/s: 1.4.2

1.4: a7df42485a6d96bb7167c6597b9b0d1c59390f4d

> Add timestamps to travis logging messages
> -
>
> Key: FLINK-8574
> URL: https://issues.apache.org/jira/browse/FLINK-8574
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> To better understand the impact of plugins it would be very helpful to have 
> timestamps prepended to each logging statement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8621.
---
Resolution: Fixed

1.4: 45efe4702ac1d932afa1b322e93264c28aefbc4a 

> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5419: [FLINK-8574][travis] Add timestamp to logging mess...

2018-02-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5419


---


[jira] [Commented] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370158#comment-16370158
 ] 

ASF GitHub Bot commented on FLINK-8621:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5473


> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8574) Add timestamps to travis logging messages

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370159#comment-16370159
 ] 

ASF GitHub Bot commented on FLINK-8574:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5419


> Add timestamps to travis logging messages
> -
>
> Key: FLINK-8574
> URL: https://issues.apache.org/jira/browse/FLINK-8574
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> To better understand the impact of plugins it would be very helpful to have 
> timestamps prepended to each logging statement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5473: [FLINK-8621][prometheus][tests] Remove endpointIsU...

2018-02-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5473


---


[jira] [Commented] (FLINK-8621) PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on Travis

2018-02-20 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370155#comment-16370155
 ] 

Chesnay Schepler commented on FLINK-8621:
-

master: 6c9a5c7f3c63404657fc1da633bc22b62c1c1178 

> PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed unstable on 
> Travis
> 
>
> Key: FLINK-8621
> URL: https://issues.apache.org/jira/browse/FLINK-8621
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> {{PrometheusReporterTest.endpointIsUnavailableAfterReporterIsClosed}} fails 
> on Travis: https://travis-ci.org/apache/flink/jobs/339344244



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8574) Add timestamps to travis logging messages

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8574.
---
Resolution: Fixed

master: 86723314e5043d7b28faf30c000f54928f24d0da

> Add timestamps to travis logging messages
> -
>
> Key: FLINK-8574
> URL: https://issues.apache.org/jira/browse/FLINK-8574
> Project: Flink
>  Issue Type: Improvement
>  Components: Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> To better understand the impact of plugins it would be very helpful to have 
> timestamps prepended to each logging statement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8593) Latency metric docs are outdated

2018-02-20 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8593:

Component/s: Documentation

> Latency metric docs are outdated
> 
>
> Key: FLINK-8593
> URL: https://issues.apache.org/jira/browse/FLINK-8593
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Metrics
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> I missed to update the latency metric documentation while working on 
> FLINK-7608. The docs should be updated to contain the new naming scheme and 
> that it is a job-level metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8717) Flink seems to deadlock due to buffer starvation when iterating

2018-02-20 Thread Romain Revol (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romain Revol updated FLINK-8717:

Description: 
We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it.

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
>since I don't know well Flink internals.

  was:
We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it.

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
>since I don't know we'll Flink internals.


> Flink seems to deadlock due to buffer starvation when iterating
> ---
>
> Key: FLINK-8717
> URL: https://issues.apache.org/jira/browse/FLINK-8717
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
> Environment: Windows 10 Pro 64-bit
> Core i7-6820HQ @ 2.7 GHz
> 16GB RAM
> Flink 1.4
> Scala client
> Scala 2.11.7
>  
>Reporter: Romain Revol
>Priority: Major
> Attachments: threadDump.txt
>
>
> We are encountering what looks like a deadlock of Flink in one of our jobs 
> with an "iterate" in it.
> I've reduced the job use case to the example in this gist : 
> [https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]
> Nothe that :
>  * varying the parallelism affects the rapidity of occurence of the deadlock, 
> but it always occur
>  * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the 
> faster we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It 
> consequently leads to think that it happens when the number of iterations 
> reaches some threshold.
> From the [^threadDump.txt], it looks like some starvation over buffer 
> allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
> since I don't know well Flink internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8717) Flink seems to deadlock due to buffer starvation when iterating

2018-02-20 Thread Romain Revol (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Romain Revol updated FLINK-8717:

Description: 
We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it.

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
>since I don't know we'll Flink internals.

  was:
We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it. 

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, but I may be mistaking since I don't know we'll Flink internals.


> Flink seems to deadlock due to buffer starvation when iterating
> ---
>
> Key: FLINK-8717
> URL: https://issues.apache.org/jira/browse/FLINK-8717
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
> Environment: Windows 10 Pro 64-bit
> Core i7-6820HQ @ 2.7 GHz
> 16GB RAM
> Flink 1.4
> Scala client
> Scala 2.11.7
>  
>Reporter: Romain Revol
>Priority: Major
> Attachments: threadDump.txt
>
>
> We are encountering what looks like a deadlock of Flink in one of our jobs 
> with an "iterate" in it.
> I've reduced the job use case to the example in this gist : 
> [https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]
> Nothe that :
>  * varying the parallelism affects the rapidity of occurence of the deadlock, 
> but it always occur
>  * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the 
> faster we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It 
> consequently leads to think that it happens when the number of iterations 
> reaches some threshold.
> From the [^threadDump.txt], it looks like some starvation over buffer 
> allocation, maybe backpressure has flaws on iterate, but I may be mistaking 
> since I don't know we'll Flink internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8717) Flink seems to deadlock due to buffer starvation when iterating

2018-02-20 Thread Romain Revol (JIRA)
Romain Revol created FLINK-8717:
---

 Summary: Flink seems to deadlock due to buffer starvation when 
iterating
 Key: FLINK-8717
 URL: https://issues.apache.org/jira/browse/FLINK-8717
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
 Environment: Windows 10 Pro 64-bit

Core i7-6820HQ @ 2.7 GHz

16GB RAM

Flink 1.4

Scala client

Scala 2.11.7

 
Reporter: Romain Revol
 Attachments: threadDump.txt

We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it. 

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, but I may be mistaking since I don't know we'll Flink internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370125#comment-16370125
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169346590
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

Thanks @zentol.


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169346590
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

Thanks @zentol.


---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-20 Thread chris snow (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370123#comment-16370123
 ] 

chris snow commented on FLINK-8543:
---

[~aljoscha] - I’m using AvroKeyValueSinkWriter 
[https://github.com/ibm-cloud-streaming-retail-demo/flink-on-iae-messagehub-to-s3/blob/master/src/main/java/com/ibm/cloud/flink/StreamingJob.java#L186]

 

[~ste...@apache.org] - HDP 2.6.2: 
https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction

 

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> 

[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370117#comment-16370117
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169341512
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

Please have a look at the javadocs for AbstractTestBase.

The logs should still be printed as `AbstractTestBase` extends 
`TestBaseUtils` which extends `TestLogger`.


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169341512
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

Please have a look at the javadocs for AbstractTestBase.

The logs should still be printed as `AbstractTestBase` extends 
`TestBaseUtils` which extends `TestLogger`.


---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370094#comment-16370094
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

[~snowch] Btw, what is the writer you're using in this case?

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> 

[jira] [Commented] (FLINK-8458) Add the switch for keeping both the old mode and the new credit-based mode

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370087#comment-16370087
 ] 

ASF GitHub Bot commented on FLINK-8458:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5317
  
@NicoK , I found most of the codes in this PR are already merged into 
master by commit `0093bcbe771f296baf3857ef15fe7ec9b22bbc34` in your 
`FLINK-8425`. Maybe I only need to add the `config.md` in this PR.


> Add the switch for keeping both the old mode and the new credit-based mode
> --
>
> Key: FLINK-8458
> URL: https://issues.apache.org/jira/browse/FLINK-8458
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
> Fix For: 1.5.0
>
>
> After the whole feature of credit-based flow control is done, we should add a 
> config parameter to switch on/off the new credit-based mode. To do so, we can 
> roll back to the old network mode for any expected risks.
> The parameter is defined as 
> {{taskmanager.network.credit-based-flow-control.enabled}} and the default 
> value is true. This switch may be removed after next release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5317: [FLINK-8458] Add the switch for keeping both the old mode...

2018-02-20 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5317
  
@NicoK , I found most of the codes in this PR are already merged into 
master by commit `0093bcbe771f296baf3857ef15fe7ec9b22bbc34` in your 
`FLINK-8425`. Maybe I only need to add the `config.md` in this PR.


---


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370083#comment-16370083
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169328894
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

One thing I found the difference from them , If we extends AbstractTestBase 
class. No logs will print to console. 


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169328894
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

One thing I found the difference from them , If we extends AbstractTestBase 
class. No logs will print to console. 


---


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370076#comment-16370076
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169327476
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

It seems that we do not use anything in AbstractTestBase class. Why do we 
still extend this class ?


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5535: [FLINK-8703][tests] Migrate tests from LocalFlinkM...

2018-02-20 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5535#discussion_r169327476
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
 ---
@@ -38,33 +35,23 @@
  * This experimental class is relocated from flink-streaming-contrib. 
Please see package-info.java
  * for more information.
  */
-public class CollectITCase extends TestLogger {
+public class CollectITCase extends AbstractTestBase {
 
--- End diff --

It seems that we do not use anything in AbstractTestBase class. Why do we 
still extend this class ?


---


[jira] [Commented] (FLINK-8714) Suggest new users to use env.readTextFile method with 2 arguments (using the charset), not to rely on system charset (which varies across environments)

2018-02-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8714?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370075#comment-16370075
 ] 

ASF GitHub Bot commented on FLINK-8714:
---

GitHub user michalklempa opened a pull request:

https://github.com/apache/flink/pull/5536

[FLINK-8714][Documentation] Added either charsetName) or "utf-8" value in 
examples of readTextFile

## What is the purpose of the change
When a newcomer (like me), goes through the docs, there are several places 
where examples encourage to read the input data using the env.readTextFile() 
method.

This method variant does not take a second argument - character set (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).
 This versoin relies (according to Javadoc) on " The file will be read with the 
system's default character set. "
*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*

Fixing this in documentation by providing charsetName in examples where the 
API is described and "utf-8" as second argument in programming examples. This 
should help others not to forget about the need to specify a charset 
programmatically, if they want to avoid non-deterministic behavior depending on 
environment.

## Brief change log

## Verifying this change

This change is a trivial rework of documentation without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalklempa/flink 
FLINK-8714_readTextFile_charset_version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5536


commit 221684da5b564b21c1e0cc99e823c18939c0ca91
Author: Michal Klempa 
Date:   2018-02-20T13:50:30Z

FLINK-8714 added either env.readTextFile(pathToFile, charsetName) where the 
API is described or readTextFile(path/to/file, utf-8)  where API is shown as 
example




> Suggest new users to use env.readTextFile method with 2 arguments (using the 
> charset), not to rely on system charset (which varies across environments)
> ---
>
> Key: FLINK-8714
> URL: https://issues.apache.org/jira/browse/FLINK-8714
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Michal Klempa
>Priority: Trivial
>  Labels: easyfix, newbie
>
> When a newcomer (like me), goes through the docs, there are several places 
> where examples encourage to read the input data using the 
> {{env.readTextFile()}} method.
>  
> This method variant does not take a second argument - character set (see 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).]
>  This versoin relies (according to Javadoc) on " The file will be read with 
> the system's default character set. "
>  
> This behavior is also default in Java, like in the 
> {{java.util.String.getBytes()}} method, where not supplying the charset mean 
> - use the system locale or the one which JVM was started with (see 
> [https://stackoverflow.com/questions/64038/setting-java-locale-settings).] 
> There are two ways to set locale prior to JVM start (-D arguments or set 
> LC_ALL variable).
>  
> Given this is something a new Flink user may not know about, nor he wants to 
> spend hours trying to find the environment-related bug (it works on 
> localhost, but in production the locale is different), I would kindly suggest 
> a change in documentation: lets migrate examples to use the two-argument 
> 

[GitHub] flink pull request #5536: [FLINK-8714][Documentation] Added either charsetNa...

2018-02-20 Thread michalklempa
GitHub user michalklempa opened a pull request:

https://github.com/apache/flink/pull/5536

[FLINK-8714][Documentation] Added either charsetName) or "utf-8" value in 
examples of readTextFile

## What is the purpose of the change
When a newcomer (like me), goes through the docs, there are several places 
where examples encourage to read the input data using the env.readTextFile() 
method.

This method variant does not take a second argument - character set (see 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).
 This versoin relies (according to Javadoc) on " The file will be read with the 
system's default character set. "
*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*

Fixing this in documentation by providing charsetName in examples where the 
API is described and "utf-8" as second argument in programming examples. This 
should help others not to forget about the need to specify a charset 
programmatically, if they want to avoid non-deterministic behavior depending on 
environment.

## Brief change log

## Verifying this change

This change is a trivial rework of documentation without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/michalklempa/flink 
FLINK-8714_readTextFile_charset_version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5536.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5536


commit 221684da5b564b21c1e0cc99e823c18939c0ca91
Author: Michal Klempa 
Date:   2018-02-20T13:50:30Z

FLINK-8714 added either env.readTextFile(pathToFile, charsetName) where the 
API is described or readTextFile(path/to/file, utf-8)  where API is shown as 
example




---


[jira] [Commented] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370073#comment-16370073
 ] 

Aljoscha Krettek commented on FLINK-8543:
-

[~ste...@apache.org] That would fix this issue but I think it would be better 
to fix this at the Flink level, and to figure out why {{flush()}} is called 
after {{close()}} in the first place.

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> 

[jira] [Commented] (FLINK-8716) AvroSerializer does not use schema of snapshot

2018-02-20 Thread Arvid Heise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370067#comment-16370067
 ] 

Arvid Heise commented on FLINK-8716:


Btw, the current behavior is that it will fail with an EOFException when a 
field is added. If it was intended that the AvroSerializer should not have soft 
migration, it should probably return the incompatible flag like it did before.

> AvroSerializer does not use schema of snapshot
> --
>
> Key: FLINK-8716
> URL: https://issues.apache.org/jira/browse/FLINK-8716
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Arvid Heise
>Priority: Major
>
> The new AvroSerializer stores the schema in the snapshot and uses it to 
> validate compability.
> However, it does not use the schema of the snapshot while reading the data. 
> This version will fail for any change of the data layout (so it supports more 
> or less only renaming currently).
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265]
>  needs to use the schema from
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188]
>  as the first parameter. Accordingly, a readSchema field need to be set
>  in #ensureCompatibility and relayed in #duplicate. Note that the readSchema 
> is passed as the write schema parameter to the DatumReader, as it was the 
> schema that was used to write the data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8716) AvroSerializer does not use schema of snapshot

2018-02-20 Thread Arvid Heise (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370052#comment-16370052
 ] 

Arvid Heise commented on FLINK-8716:


In the email thread, [~till.rohrmann] also pointed out that 
cancel-with-snapshot always produces full checkpoints. So within a snapshot 
only one serializer version is used.

The question should be: what happens when we do an incremental checkpoint 
afterwards. Then we probably need two different serializer configurations for 
the same descriptor. That sounds doable (use descriptor name/checkpoint name as 
the key of the meta information instead of just the descriptor name) but may 
require larger refactoring.

> AvroSerializer does not use schema of snapshot
> --
>
> Key: FLINK-8716
> URL: https://issues.apache.org/jira/browse/FLINK-8716
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Arvid Heise
>Priority: Major
>
> The new AvroSerializer stores the schema in the snapshot and uses it to 
> validate compability.
> However, it does not use the schema of the snapshot while reading the data. 
> This version will fail for any change of the data layout (so it supports more 
> or less only renaming currently).
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265]
>  needs to use the schema from
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188]
>  as the first parameter. Accordingly, a readSchema field need to be set
>  in #ensureCompatibility and relayed in #duplicate. Note that the readSchema 
> is passed as the write schema parameter to the DatumReader, as it was the 
> schema that was used to write the data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8716) AvroSerializer does not use schema of snapshot

2018-02-20 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370051#comment-16370051
 ] 

Aljoscha Krettek commented on FLINK-8716:
-

The idea is that when a {{TypeSerializer}} signals that it requires migration 
the backend would read all data with the old serialiser and re-encode with the 
new serialiser. This way, we would always have data consistently encoded in the 
backend.

The problem is that this isn't implemented yet so I think we cannot change how 
the serialiser works currently.

> AvroSerializer does not use schema of snapshot
> --
>
> Key: FLINK-8716
> URL: https://issues.apache.org/jira/browse/FLINK-8716
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
>Reporter: Arvid Heise
>Priority: Major
>
> The new AvroSerializer stores the schema in the snapshot and uses it to 
> validate compability.
> However, it does not use the schema of the snapshot while reading the data. 
> This version will fail for any change of the data layout (so it supports more 
> or less only renaming currently).
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265]
>  needs to use the schema from
>  
> [https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188]
>  as the first parameter. Accordingly, a readSchema field need to be set
>  in #ensureCompatibility and relayed in #duplicate. Note that the readSchema 
> is passed as the write schema parameter to the DatumReader, as it was the 
> schema that was used to write the data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >