[jira] [Commented] (FLINK-7704) Port JobPlanHandler to new REST endpoint

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245421#comment-16245421
 ] 

ASF GitHub Bot commented on FLINK-7704:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4978
  
LGTM. Thanks for your contribution @yew1eb. Merging.


> Port JobPlanHandler to new REST endpoint
> 
>
> Key: FLINK-7704
> URL: https://issues.apache.org/jira/browse/FLINK-7704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobPlanHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4978: [FLINK-7704][hotfix][flip6] Fix JobPlanInfoTest package p...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4978
  
LGTM. Thanks for your contribution @yew1eb. Merging.


---


[jira] [Commented] (FLINK-8025) Let DashboardConfigHandler extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245428#comment-16245428
 ] 

ASF GitHub Bot commented on FLINK-8025:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4983
  
Thanks for your review @zentol. Merging.


> Let DashboardConfigHandler extend from AbstractRestHandler
> --
>
> Key: FLINK-8025
> URL: https://issues.apache.org/jira/browse/FLINK-8025
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{DashboardConfigHandler}} should directly extend {{AbstractRestHandler}} 
> to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8026) Let ClusterConfigHandler extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245430#comment-16245430
 ] 

ASF GitHub Bot commented on FLINK-8026:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4984
  
Thanks for your review @zentol. Merging.


> Let ClusterConfigHandler extend from AbstractRestHandler
> 
>
> Key: FLINK-8026
> URL: https://issues.apache.org/jira/browse/FLINK-8026
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{ClusterConfigHandler}} should directly extend from the 
> {{AbstractRestHandler}} to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4984: [FLINK-8026] Let ClusterConfigHandler directly extend Abs...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4984
  
Thanks for your review @zentol. Merging.


---


[jira] [Commented] (FLINK-7419) Shade jackson dependency in flink-avro

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245495#comment-16245495
 ] 

ASF GitHub Bot commented on FLINK-7419:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4981
  
Ok, sounds good.  



> Shade jackson dependency in flink-avro
> --
>
> Key: FLINK-7419
> URL: https://issues.apache.org/jira/browse/FLINK-7419
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Avro uses {{org.codehouse.jackson}} which also exists in multiple 
> incompatible versions. We should shade it to 
> {{org.apache.flink.shaded.avro.org.codehouse.jackson}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4981: [FLINK-7419][build][avro] Relocate jackson in flink-dist

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4981
  
Ok, sounds good. 👌 



---


[jira] [Commented] (FLINK-8008) PojoTypeInfo should sort fields fields based on boolean

2017-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245509#comment-16245509
 ] 

Fabian Hueske commented on FLINK-8008:
--

Depends on the TableSink. 
{{CsvTableSink}} will write the fields in the order of the fields in the schema.

> PojoTypeInfo should sort fields fields based on boolean
> ---
>
> Key: FLINK-8008
> URL: https://issues.apache.org/jira/browse/FLINK-8008
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.3.2
>Reporter: Muhammad Imran Tariq
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Flink PojoTypeInfo sorts fields array that are passed into constructor 
> arguments. I want to create another constructor that takes boolean parameter 
> to sort field or not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7517) let NettyBufferPool extend PooledByteBufAllocator

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245348#comment-16245348
 ] 

ASF GitHub Bot commented on FLINK-7517:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4594
  
This improvement is indeed very clean and reduce many redundant `Override` 
methods by extending `PooledByteBufAllocator` directly. 

But I am still confused of one thing. In previous way, we created the 
`PooledByteBufAllocator` with `preferDirect` parameter as true, and the created 
allocator will be used in client and server. So it seems no chance to allow the 
heap buffer in original allocator in netty code. Or I missed or mis-understood 
some key informations?


> let NettyBufferPool extend PooledByteBufAllocator
> -
>
> Key: FLINK-7517
> URL: https://issues.apache.org/jira/browse/FLINK-7517
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{NettyBufferPool}} wraps {{PooledByteBufAllocator}} but due to this, any 
> allocated buffer's {{alloc()}} method is returning the wrapped 
> {{PooledByteBufAllocator}} which allowed heap buffers again. By extending the 
> {{PooledByteBufAllocator}}, we prevent this loop hole and also fix the 
> invariant that a copy of a buffer should have the same allocator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7003) "select * from" in Flink SQL should not flatten all fields in the table by default

2017-11-09 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245361#comment-16245361
 ] 

Shuyi Chen edited comment on FLINK-7003 at 11/9/17 8:28 AM:


added  [pull request|https://github.com/apache/flink/pull/4989]


was (Author: suez1224):
added  [link pull request|https://github.com/apache/flink/pull/4989]

> "select * from" in Flink SQL should not flatten all fields in the table by 
> default
> --
>
> Key: FLINK-7003
> URL: https://issues.apache.org/jira/browse/FLINK-7003
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>
> Currently, CompositeRelDataType is extended from 
> RelRecordType(StructKind.PEEK_FIELDS, ...).  In Calcite, 
> StructKind.PEEK_FIELDS would allow us to peek fields for nested types. 
> However, when we use "select * from", calcite will flatten all nested fields 
> that is marked as StructKind.PEEK_FIELDS in the table. 
> For example, if the table structure *T* is as follows:
> {code:java}
> VARCHAR K0,
> VARCHAR C1,
> RecordType:peek_no_flattening(INTEGER C0, INTEGER C1) F0,
> RecordType:peek_no_flattening(INTEGER C0, INTEGER C2) F1
> {code}
> The following query
> {code:java}
> Select * from T
> {code} should return (K0, C1, F0, F1) instead of (K0, C1, F0.C0, F0.C1, 
> F1.C0, F1.C1), which is the current behavior.
> After upgrading to Calcite 1.14, this issue should change the type of 
> {{CompositeRelDataType}} to {{StructKind. PEEK_FIELDS_NO_EXPAND}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7003) "select * from" in Flink SQL should not flatten all fields in the table by default

2017-11-09 Thread Shuyi Chen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245361#comment-16245361
 ] 

Shuyi Chen commented on FLINK-7003:
---

added  [link pull request|https://github.com/apache/flink/pull/4989]

> "select * from" in Flink SQL should not flatten all fields in the table by 
> default
> --
>
> Key: FLINK-7003
> URL: https://issues.apache.org/jira/browse/FLINK-7003
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>
> Currently, CompositeRelDataType is extended from 
> RelRecordType(StructKind.PEEK_FIELDS, ...).  In Calcite, 
> StructKind.PEEK_FIELDS would allow us to peek fields for nested types. 
> However, when we use "select * from", calcite will flatten all nested fields 
> that is marked as StructKind.PEEK_FIELDS in the table. 
> For example, if the table structure *T* is as follows:
> {code:java}
> VARCHAR K0,
> VARCHAR C1,
> RecordType:peek_no_flattening(INTEGER C0, INTEGER C1) F0,
> RecordType:peek_no_flattening(INTEGER C0, INTEGER C2) F1
> {code}
> The following query
> {code:java}
> Select * from T
> {code} should return (K0, C1, F0, F1) instead of (K0, C1, F0.C0, F0.C1, 
> F1.C0, F1.C1), which is the current behavior.
> After upgrading to Calcite 1.14, this issue should change the type of 
> {{CompositeRelDataType}} to {{StructKind. PEEK_FIELDS_NO_EXPAND}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245484#comment-16245484
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149926517
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149926517
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());
+
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   

[jira] [Commented] (FLINK-8009) flink-dist pulls in flink-runtime's transitive avro/jackson dependency

2017-11-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245498#comment-16245498
 ] 

Aljoscha Krettek commented on FLINK-8009:
-

It's a weird situation but if someone looks at the release notes of 1.5 and 
they see "XYZ fixed" they would think that it was a bug of 1.4, which in fact 
it wasn't. It's only strange because of the timing of cutting the release 
branch.

I don't want to split hairs, though, so fine with both. 

> flink-dist pulls in flink-runtime's transitive avro/jackson dependency
> --
>
> Key: FLINK-8009
> URL: https://issues.apache.org/jira/browse/FLINK-8009
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The promotion of transitive dependencies in flink-runtime causes flink-dist 
> to contain _some_ transitive dependencies from flink-shaded-hadoop. (most 
> notably, avro and codehaus.jackson)
> We will either have to add an exclusion for each dependency to flink-dist, 
> set flink-shaded-hadoop to provided in flink-runtime (hacky, but less 
> intrusive), or remove the promotion and explicitly depend on various akka 
> dependencies.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4983: [FLINK-8025] Let DashboardConfigHandler directly extend A...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4983
  
Thanks for your review @zentol. Merging.


---


[jira] [Commented] (FLINK-8024) Let ClusterOverviewHandler directly extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245426#comment-16245426
 ] 

ASF GitHub Bot commented on FLINK-8024:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4982
  
Thanks for your review @zentol. Merging.


> Let ClusterOverviewHandler directly extend from AbstractRestHandler
> ---
>
> Key: FLINK-8024
> URL: https://issues.apache.org/jira/browse/FLINK-8024
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to get rid of the {{LegacyRestHandler}} we should add a proper 
> implementation of {{ClusterOverviewHandler}} which extends from 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4982: [FLINK-8024] Let ClusterOverviewHandler directly extend f...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4982
  
Thanks for your review @zentol. Merging.


---


[jira] [Commented] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245425#comment-16245425
 ] 

ASF GitHub Bot commented on FLINK-8000:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4958
  
Will remove the comment and then merge the PR. Thanks for your review 
@zentol.


> Sort REST handler URLs in RestServerEndpoint
> 
>
> Key: FLINK-8000
> URL: https://issues.apache.org/jira/browse/FLINK-8000
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> In order to make the {{RestServerEndpoint}} more easily extendable, we should 
> automatically sort the returned list of rest handler when calling 
> {{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
> handlers are added to the list is independent of the actual registration 
> order. This is, for example, important for the static file server which 
> always needs to be registered last.
> I propose to add a special {{String}} {{Comparator}} which considers the 
> charactor {{':'}} to be the character with the largest value. That way we 
> should get always the following sort order:
> - URLs without path parameters have precedence over similar URLs where parts 
> are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} 
> and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
> - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
> {{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4958: [FLINK-8000] Sort Rest handler URLS in RestServerEndpoint

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4958
  
Will remove the comment and then merge the PR. Thanks for your review 
@zentol.


---


[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

2017-11-09 Thread Erik van Oosten (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245474#comment-16245474
 ] 

Erik van Oosten commented on FLINK-5633:


bq. Just curious, why are you creating a new reader for each record?

Its just a bit easier then caching a reader for each writer/reader schema 
combination.

> ClassCastException: X cannot be cast to X when re-submitting a job.
> ---
>
> Key: FLINK-5633
> URL: https://issues.apache.org/jira/browse/FLINK-5633
> Project: Flink
>  Issue Type: Bug
>  Components: Job-Submission, YARN
>Affects Versions: 1.1.4
>Reporter: Giuliano Caliari
>Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>   at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>   at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>   at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.App$$anonfun$main$1.apply(App.scala:76)
>   at scala.collection.immutable.List.foreach(List.scala:381)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>   at scala.App$class.main(App.scala:76)
>   at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>   at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>   at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> 

[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

2017-11-09 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4594#discussion_r149889426
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
 ---
@@ -52,51 +48,61 @@
/** Configured chunk size for the arenas. */
private final int chunkSize;
 
+   /** We strictly prefer direct buffers and disallow heap allocations. */
+   private static final boolean PREFER_DIRECT = true;
+
+   /**
+* Arenas allocate chunks of pageSize << maxOrder bytes. With these 
defaults, this results in
+* chunks of 16 MB.
+*
+* @see #MAX_ORDER
+*/
+   private static final int PAGE_SIZE = 8192;
+
+   /**
+* Arenas allocate chunks of pageSize << maxOrder bytes. With these 
defaults, this results in
+* chunks of 16 MB.
+*
+* @see #PAGE_SIZE
+*/
+   private static final int MAX_ORDER = 11;
+
/**
 * Creates Netty's buffer pool with the specified number of direct 
arenas.
 *
 * @param numberOfArenas Number of arenas (recommended: 2 * number of 
task
 *   slots)
 */
public NettyBufferPool(int numberOfArenas) {
+   super(
+   PREFER_DIRECT,
+   // No heap arenas, please.
+   0,
+   // Number of direct arenas. Each arena allocates a 
chunk of 16 MB, i.e.
+   // we allocate numDirectArenas * 16 MB of direct 
memory. This can grow
+   // to multiple chunks per arena during runtime, but 
this should only
+   // happen with a large amount of connections per task 
manager. We
+   // control the memory allocations with low/high 
watermarks when writing
+   // to the TCP channels. Chunks are allocated lazily.
+   numberOfArenas,
+   PAGE_SIZE,
+   MAX_ORDER);
+
checkArgument(numberOfArenas >= 1, "Number of arenas");
--- End diff --

Yes, it would be nice to be able to do so but since this is the 
constructor, it is not possible. I guess that `super()` may fail itself with an 
invalid parameter - if not, we fail a bit afterwards.


---


[GitHub] flink issue #4594: [FLINK-7517][network] let NettyBufferPool extend PooledBy...

2017-11-09 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4594
  
This improvement is indeed very clean and reduce many redundant `Override` 
methods by extending `PooledByteBufAllocator` directly. 

But I am still confused of one thing. In previous way, we created the 
`PooledByteBufAllocator` with `preferDirect` parameter as true, and the created 
allocator will be used in client and server. So it seems no chance to allow the 
heap buffer in original allocator in netty code. Or I missed or mis-understood 
some key informations?


---


[jira] [Commented] (FLINK-7517) let NettyBufferPool extend PooledByteBufAllocator

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245347#comment-16245347
 ] 

ASF GitHub Bot commented on FLINK-7517:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4594#discussion_r149889426
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java
 ---
@@ -52,51 +48,61 @@
/** Configured chunk size for the arenas. */
private final int chunkSize;
 
+   /** We strictly prefer direct buffers and disallow heap allocations. */
+   private static final boolean PREFER_DIRECT = true;
+
+   /**
+* Arenas allocate chunks of pageSize << maxOrder bytes. With these 
defaults, this results in
+* chunks of 16 MB.
+*
+* @see #MAX_ORDER
+*/
+   private static final int PAGE_SIZE = 8192;
+
+   /**
+* Arenas allocate chunks of pageSize << maxOrder bytes. With these 
defaults, this results in
+* chunks of 16 MB.
+*
+* @see #PAGE_SIZE
+*/
+   private static final int MAX_ORDER = 11;
+
/**
 * Creates Netty's buffer pool with the specified number of direct 
arenas.
 *
 * @param numberOfArenas Number of arenas (recommended: 2 * number of 
task
 *   slots)
 */
public NettyBufferPool(int numberOfArenas) {
+   super(
+   PREFER_DIRECT,
+   // No heap arenas, please.
+   0,
+   // Number of direct arenas. Each arena allocates a 
chunk of 16 MB, i.e.
+   // we allocate numDirectArenas * 16 MB of direct 
memory. This can grow
+   // to multiple chunks per arena during runtime, but 
this should only
+   // happen with a large amount of connections per task 
manager. We
+   // control the memory allocations with low/high 
watermarks when writing
+   // to the TCP channels. Chunks are allocated lazily.
+   numberOfArenas,
+   PAGE_SIZE,
+   MAX_ORDER);
+
checkArgument(numberOfArenas >= 1, "Number of arenas");
--- End diff --

Yes, it would be nice to be able to do so but since this is the 
constructor, it is not possible. I guess that `super()` may fail itself with an 
invalid parameter - if not, we fail a bit afterwards.


> let NettyBufferPool extend PooledByteBufAllocator
> -
>
> Key: FLINK-7517
> URL: https://issues.apache.org/jira/browse/FLINK-7517
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{NettyBufferPool}} wraps {{PooledByteBufAllocator}} but due to this, any 
> allocated buffer's {{alloc()}} method is returning the wrapped 
> {{PooledByteBufAllocator}} which allowed heap buffers again. By extending the 
> {{PooledByteBufAllocator}}, we prevent this loop hole and also fix the 
> invariant that a copy of a buffer should have the same allocator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4989: [Flink 7003] [Table API & SQL] use PEEK_FIELDS_NO_...

2017-11-09 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/4989

[Flink 7003] [Table API & SQL] use PEEK_FIELDS_NO_EXPAND for 
CompositeRelDataType

## What is the purpose of the change

This pull request change CompositeRelDataType to use the newly added 
StructKind PEEK_FIELDS_NO_EXPAND to disable the flattening behavior when select 
star.


## Brief change log
  - Change CompositeRelDataType to use StructKind.PEEK_FIELDS_NO_EXPAND.
  - added unittests for both stream/batch table/SQL APIs.


## Verifying this change

This change added unittests for both stream/batch table/SQL APIs to verify 
the behavior.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): NO
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: NO
  - The serializers: NO
  - The runtime per-record code paths (performance sensitive): NO
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: NO
  - The S3 file system connector: NO

## Documentation
No


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink flink-7003

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4989.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4989


commit e676cfeb2ecc4ed07960813bf91c7bf2442fe1c2
Author: Shuyi Chen 
Date:   2017-11-09T08:05:20Z

use PEEK_FIELDS_NO_EXPAND for CompositeRelDataType




---


[jira] [Commented] (FLINK-7977) bump version of compatibility check for Flink 1.4

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245496#comment-16245496
 ] 

ASF GitHub Bot commented on FLINK-7977:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4945
  
@greghogan That makes sense, I didn't think about added APIs. 


> bump version of compatibility check for Flink 1.4
> -
>
> Key: FLINK-7977
> URL: https://issues.apache.org/jira/browse/FLINK-7977
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> Since Flink maintains backward compatibility check for 2 versions, Flink 1.4 
> should check compatibility with 1.2



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4945: [FLINK-7977][build] bump version of compatibility check f...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4945
  
@greghogan That makes sense, I didn't think about added APIs. 


---


[GitHub] flink pull request #4978: [FLINK-7704][hotfix][flip6] Fix JobPlanInfoTest pa...

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4978


---


[jira] [Commented] (FLINK-8008) PojoTypeInfo should sort fields fields based on boolean

2017-11-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245504#comment-16245504
 ] 

Aljoscha Krettek commented on FLINK-8008:
-

[~fhueske] Is there a way of specifying the order of fields when writing a 
Table as CSV?

I don't think that not sorting will solve this problem in general because the 
order of fields in {{PojoTypeInfo}} is arbitrary.

> PojoTypeInfo should sort fields fields based on boolean
> ---
>
> Key: FLINK-8008
> URL: https://issues.apache.org/jira/browse/FLINK-8008
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.3.2
>Reporter: Muhammad Imran Tariq
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Flink PojoTypeInfo sorts fields array that are passed into constructor 
> arguments. I want to create another constructor that takes boolean parameter 
> to sort field or not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7704) Port JobPlanHandler to new REST endpoint

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245423#comment-16245423
 ] 

ASF GitHub Bot commented on FLINK-7704:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4978


> Port JobPlanHandler to new REST endpoint
> 
>
> Key: FLINK-7704
> URL: https://issues.apache.org/jira/browse/FLINK-7704
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Hai Zhou UTC+8
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{JobPlanHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8036) Consider using gradle to build Flink

2017-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245390#comment-16245390
 ] 

Fabian Hueske commented on FLINK-8036:
--

I think this will be very hard. 

Flink's build management has grown a lot over time and is far from trivial with 
the dependency, code style, header check, artifact generation, and especially 
shading configuration.
Re-implementing all of this in Gradle (or any other build system) would be a 
large effort.

However, IMO this might not even be the biggest issue. The Flink community (or 
at least a good number of people) knows Maven quite well by now. 
All that knowledge would be lost and would have to be regained for a different 
tool if we would switching. 
The community would depend quite heavily on whoever would port the build until 
we gained enough experience with Gradle or whatever.

> Consider using gradle to build Flink
> 
>
> Key: FLINK-8036
> URL: https://issues.apache.org/jira/browse/FLINK-8036
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>
> Here is summary from Lukasz over this thread 
> (http://search-hadoop.com/m/Beam/gfKHFVh4NM151XIu1?subj=Re+DISCUSS+Move+away+from+Apache+Maven+as+build+tool)
>  w.r.t. performance boost from using gradle:
> Maven performs parallelization at the module level, an entire module needs
> to complete before any dependent modules can start, this means running all
> the checks like findbugs, checkstyle, tests need to finish. Gradle has task
> level parallelism between subprojects which means that as soon as the
> compile and shade steps are done for a project, and dependent subprojects
> can typically start. This means that we get increased parallelism due to
> not needing to wait for findbugs, checkstyle, tests to run. I typically see
> ~20 tasks (at peak) running on my desktop in parallel.
> Flink should consider using gradle - on Linux with SSD, a clean build takes 
> an hour.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8024) Let ClusterOverviewHandler directly extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245442#comment-16245442
 ] 

ASF GitHub Bot commented on FLINK-8024:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4982


> Let ClusterOverviewHandler directly extend from AbstractRestHandler
> ---
>
> Key: FLINK-8024
> URL: https://issues.apache.org/jira/browse/FLINK-8024
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to get rid of the {{LegacyRestHandler}} we should add a proper 
> implementation of {{ClusterOverviewHandler}} which extends from 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245441#comment-16245441
 ] 

ASF GitHub Bot commented on FLINK-8000:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4958


> Sort REST handler URLs in RestServerEndpoint
> 
>
> Key: FLINK-8000
> URL: https://issues.apache.org/jira/browse/FLINK-8000
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{RestServerEndpoint}} more easily extendable, we should 
> automatically sort the returned list of rest handler when calling 
> {{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
> handlers are added to the list is independent of the actual registration 
> order. This is, for example, important for the static file server which 
> always needs to be registered last.
> I propose to add a special {{String}} {{Comparator}} which considers the 
> charactor {{':'}} to be the character with the largest value. That way we 
> should get always the following sort order:
> - URLs without path parameters have precedence over similar URLs where parts 
> are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} 
> and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
> - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
> {{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4984: [FLINK-8026] Let ClusterConfigHandler directly ext...

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4984


---


[jira] [Closed] (FLINK-8026) Let ClusterConfigHandler extend from AbstractRestHandler

2017-11-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8026.

Resolution: Fixed

Fixed via 541fe43663d2a24b1ae66bc2b5228c49dfd43e7b

> Let ClusterConfigHandler extend from AbstractRestHandler
> 
>
> Key: FLINK-8026
> URL: https://issues.apache.org/jira/browse/FLINK-8026
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{ClusterConfigHandler}} should directly extend from the 
> {{AbstractRestHandler}} to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8026) Let ClusterConfigHandler extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245444#comment-16245444
 ] 

ASF GitHub Bot commented on FLINK-8026:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4984


> Let ClusterConfigHandler extend from AbstractRestHandler
> 
>
> Key: FLINK-8026
> URL: https://issues.apache.org/jira/browse/FLINK-8026
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{ClusterConfigHandler}} should directly extend from the 
> {{AbstractRestHandler}} to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8025) Let DashboardConfigHandler extend from AbstractRestHandler

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245443#comment-16245443
 ] 

ASF GitHub Bot commented on FLINK-8025:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4983


> Let DashboardConfigHandler extend from AbstractRestHandler
> --
>
> Key: FLINK-8025
> URL: https://issues.apache.org/jira/browse/FLINK-8025
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{DashboardConfigHandler}} should directly extend {{AbstractRestHandler}} 
> to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4983: [FLINK-8025] Let DashboardConfigHandler directly e...

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4983


---


[GitHub] flink pull request #4958: [FLINK-8000] Sort Rest handler URLS in RestServerE...

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4958


---


[jira] [Closed] (FLINK-8025) Let DashboardConfigHandler extend from AbstractRestHandler

2017-11-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8025.

Resolution: Fixed

Fixed via fa967dfc0cddec0a5d0c7d76e564a8307ec3fb35

> Let DashboardConfigHandler extend from AbstractRestHandler
> --
>
> Key: FLINK-8025
> URL: https://issues.apache.org/jira/browse/FLINK-8025
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{DashboardConfigHandler}} should directly extend {{AbstractRestHandler}} 
> to get rid of the {{LegacyRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4982: [FLINK-8024] Let ClusterOverviewHandler directly e...

2017-11-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4982


---


[jira] [Commented] (FLINK-7980) Bump joda-time to 2.9.9

2017-11-09 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245492#comment-16245492
 ] 

Aljoscha Krettek commented on FLINK-7980:
-

Yep, I think getting rid of it makes sense.

> Bump joda-time to 2.9.9
> ---
>
> Key: FLINK-7980
> URL: https://issues.apache.org/jira/browse/FLINK-7980
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> joda-time is version 2.5(Oct, 2014), bumping to 2.9.9(the latest version). 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8008) PojoTypeInfo should sort fields fields based on boolean

2017-11-09 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245513#comment-16245513
 ] 

Fabian Hueske commented on FLINK-8008:
--

Except for nested field, {{CsvTableSink}} simply calls {{toString()}} on each 
field.

> PojoTypeInfo should sort fields fields based on boolean
> ---
>
> Key: FLINK-8008
> URL: https://issues.apache.org/jira/browse/FLINK-8008
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Affects Versions: 1.3.2
>Reporter: Muhammad Imran Tariq
>Priority: Minor
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> Flink PojoTypeInfo sorts fields array that are passed into constructor 
> arguments. I want to create another constructor that takes boolean parameter 
> to sort field or not.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8000) Sort REST handler URLs in RestServerEndpoint

2017-11-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8000?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8000.

   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via 34fdf569d7defb4393849fb9ecb2763b14532cc6

> Sort REST handler URLs in RestServerEndpoint
> 
>
> Key: FLINK-8000
> URL: https://issues.apache.org/jira/browse/FLINK-8000
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to make the {{RestServerEndpoint}} more easily extendable, we should 
> automatically sort the returned list of rest handler when calling 
> {{RestServerEndpoint#initializeHandlers}}. That way the order in which the 
> handlers are added to the list is independent of the actual registration 
> order. This is, for example, important for the static file server which 
> always needs to be registered last.
> I propose to add a special {{String}} {{Comparator}} which considers the 
> charactor {{':'}} to be the character with the largest value. That way we 
> should get always the following sort order:
> - URLs without path parameters have precedence over similar URLs where parts 
> are replaced by path parameters (e.g. {{/jobs/overview}}, {{/jobs/:jobid}} 
> and {{/jobs/:jobid/config}}, {{/jobs/:jobid/vertices/:vertexId}})
> - Prefixes are sorted before URLs containing the prefix (e.g. {{/jobs}}, 
> {{/jobs/overview}})



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8024) Let ClusterOverviewHandler directly extend from AbstractRestHandler

2017-11-09 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8024.

Resolution: Fixed

Fixed via 34fdf569d7defb4393849fb9ecb2763b14532cc6

> Let ClusterOverviewHandler directly extend from AbstractRestHandler
> ---
>
> Key: FLINK-8024
> URL: https://issues.apache.org/jira/browse/FLINK-8024
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to get rid of the {{LegacyRestHandler}} we should add a proper 
> implementation of {{ClusterOverviewHandler}} which extends from 
> {{AbstractRestHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Thanks for spotting and fixing this! 👍 

I'm merging.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245630#comment-16245630
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
There is only one thread dispatching the calls:
```
executor = Executors.newSingleThreadExecutor(
new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
this.asyncCallDispatcher = executor;
```

The tasks cannot overtake each other. I could make the test more strict and 
wait additionally on `triggerLatch` in case somebody decides to have multiple 
threads.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
There is only one thread dispatching the calls:
```
executor = Executors.newSingleThreadExecutor(
new 
DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + 
taskNameWithSubtask));
this.asyncCallDispatcher = executor;
```

The tasks cannot overtake each other. I could make the test more strict and 
wait additionally on `triggerLatch` in case somebody decides to have multiple 
threads.


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245631#comment-16245631
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'll merge, yes, what are the follow-up issues you created?


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and an exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Created] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8040:
---

 Summary: Test instability ResourceGuard#testCloseBlockIfAcquired
 Key: FLINK-8040
 URL: https://issues.apache.org/jira/browse/FLINK-8040
 Project: Flink
  Issue Type: Bug
  Components: Core, Tests
Affects Versions: 1.4.0, 1.5.0
Reporter: Gary Yao
 Fix For: 1.4.0, 1.5.0


Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke the failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4989: [Flink 7003] [Table API & SQL] use PEEK_FIELDS_NO_EXPAND ...

2017-11-09 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4989
  
Thanks for the PR @suez1224.
I will merge it.

Thanks, Fabian


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245753#comment-16245753
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!

Could you please close if GH doesn't auto-close?


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7886) Enable dependency convergence for flink-core

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7886.
---
Resolution: Fixed

Implemented on master in
bdbca37b01254aef9ddd6943d34dab1838c2a9f1

> Enable dependency convergence for flink-core
> 
>
> Key: FLINK-7886
> URL: https://issues.apache.org/jira/browse/FLINK-7886
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983029
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

Shouldn't we override the avro version instead of excluding it?


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245769#comment-16245769
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983025
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

It might be necessary to add these dependencies then explicitly here.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> 

[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245768#comment-16245768
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983029
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

Shouldn't we override the avro version instead of excluding it?


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983025
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

It might be necessary to add these dependencies then explicitly here.


---


[jira] [Assigned] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8040:
---

Assignee: Stefan Richter

> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>Assignee: Stefan Richter
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> {noformat}
> *How to reproduce*
> Run the test with a high number of iterations.
> To further provoke a failure, add {{Thread.sleep(100)}} before the following 
> line
> {code}
> ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
> {code}
> This will more likely result in {{closed}} being {{true}} at the time the 2nd 
> lease is acquired and an exception is thrown:
> {code}
>   if (closed) {
>   throw new IOException("Resource guard was already closed.");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149985097
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

I think it doesn't matter because the latch already checks for the flag.
```
public void await() throws InterruptedException {
synchronized (lock) {
while (!triggered) {
lock.wait();
}
}
}
```


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245593#comment-16245593
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

Indented.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245592#comment-16245592
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954576
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954849
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Not sure but I decided to add it again.


---


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
I'll merge, yes, what are the follow-up issues you created?


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245765#comment-16245765
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149982812
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

for all latches, it should also have:`if (!latch.isTriggered()) { 
latch.await() }`


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149980302
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

Can't we enforce jackson 2.6 via dependency management? I think this would 
be cleaner than excluding the dependencies here and assume that 
`aws-java-sdk-s3` pulls in the missing dependencies.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245781#comment-16245781
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149985097
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

I think it doesn't matter because the latch already checks for the flag.
```
public void await() throws InterruptedException {
synchronized (lock) {
while (!triggered) {
lock.wait();
}
}
}
```


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> 

[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245767#comment-16245767
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149980302
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

Can't we enforce jackson 2.6 via dependency management? I think this would 
be cleaner than excluding the dependencies here and assume that 
`aws-java-sdk-s3` pulls in the missing dependencies.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at 

[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Yes, but I think this is making an assumption about the internal 
implementation. If someone changes that the test could break/not test the right 
thing anymore.


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245681#comment-16245681
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
Yes, but I think this is making an assumption about the internal 
implementation. If someone changes that the test could break/not test the right 
thing anymore.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> 

[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!

Could you please close if GH doesn't auto-close?


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245776#comment-16245776
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149984491
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Sorry, I was just looking on the IDE and missed the lines. This line should 
be before every time you call `await` on the `latch`.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[jira] [Commented] (FLINK-7979) Use Log.*(Object, Throwable) overload to log exceptions

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245575#comment-16245575
 ] 

ASF GitHub Bot commented on FLINK-7979:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4948
  
Thanks for understanding!


> Use Log.*(Object, Throwable) overload to log exceptions
> ---
>
> Key: FLINK-7979
> URL: https://issues.apache.org/jira/browse/FLINK-7979
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Critical
> Fix For: 1.5.0
>
>
> I found some code that logging an exception, through convert the exception to 
> string or call .getMessage. {{.getMessage()}}.
> I think the better way is to use the Logger method overloads which take 
> {{`Throwable`}} as a parameter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4948: [FLINK-7979][minor] Use Log.*(Object, Throwable) overload...

2017-11-09 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4948
  
Thanks for understanding!


---


[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!


---


[jira] [Comment Edited] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577
 ] 

Stephan Ewen edited comment on FLINK-6022 at 11/9/17 12:48 PM:
---

We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways?

I would honestly close this, because I assume the intention was around using 
Avro's specific record mechanism and the "generic" mechanism (where we use the 
ReflectDatumReader/Writer). Both should work well now.


was (Author: stephanewen):
We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways.

> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8017.
---
Resolution: Fixed

Fixed on release-1.4 in
02a19a14fad1ef928038f4971bdcacf4d0642d88

Fixed on master in
624df0120a962fe93d31544d7b13637121196197

> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245603#comment-16245603
 ] 

ASF GitHub Bot commented on FLINK-8017:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Could you please close this if it doesn't auto-close?


> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8017:

Fix Version/s: 1.4.0

> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
> Fix For: 1.4.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4976: [FLINK-8017] Fix High availability cluster-id key in docu...

2017-11-09 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Could you please close this if it doesn't auto-close?


---


[jira] [Updated] (FLINK-8040) Test instability ResourceGuard#testCloseBlockIfAcquired

2017-11-09 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8040:

Description: 
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke a failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}

  was:
Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
{noformat}
java.io.IOException: Resource guard was already closed.

at 
org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
at 
org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
{noformat}

*How to reproduce*
Run the test with a high number of iterations.
To further provoke the failure, add {{Thread.sleep(100)}} before the following 
line
{code}
ResourceGuard.Lease lease_2 = resourceGuard.acquireResource();
{code}

This will more likely result in {{closed}} being {{true}} at the time the 2nd 
lease is acquired, and and exception is thrown:
{code}
if (closed) {
throw new IOException("Resource guard was already closed.");
}
{code}


> Test instability ResourceGuard#testCloseBlockIfAcquired
> ---
>
> Key: FLINK-8040
> URL: https://issues.apache.org/jira/browse/FLINK-8040
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Gary Yao
>  Labels: test-stability
> Fix For: 1.4.0, 1.5.0
>
>
> Test {{ResourceGuard#testCloseBlockIfAcquired()}} fails sporadically with 
> {noformat}
> java.io.IOException: Resource guard was already closed.
>   at 
> org.apache.flink.util.ResourceGuard.acquireResource(ResourceGuard.java:72)
>   at 
> org.apache.flink.util.ResourceGuardTest.testCloseBlockIfAcquired(ResourceGuardTest.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245773#comment-16245773
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149983878
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Why is that? I think at this point the latch might not get triggered at all.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149983878
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

Why is that? I think at this point the latch might not get triggered at all.


---


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245810#comment-16245810
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4777


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7765) Enable dependency convergence

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245808#comment-16245808
 ] 

ASF GitHub Bot commented on FLINK-7765:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4777
  
Thanks!


> Enable dependency convergence
> -
>
> Key: FLINK-7765
> URL: https://issues.apache.org/jira/browse/FLINK-7765
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> For motivation check https://issues.apache.org/jira/browse/FLINK-7739
> SubTasks of this task depends on one another - to enable convergence in 
> `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149947069
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Why was this removed?


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245569#comment-16245569
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149947094
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Why was this removed?


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245570#comment-16245570
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149946930
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

This is a code style preference rather than an issue, but I would suggest 
to indent the arguments by a tab to separate them from the body of the method.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245572#comment-16245572
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149947069
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -254,12 +300,10 @@ else if (this.error == null) {
 
@Override
public void triggerCheckpointOnBarrier(CheckpointMetaData 
checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics 
checkpointMetrics) throws Exception {
-   throw new UnsupportedOperationException("Should not be 
called");
}
 
@Override
public void abortCheckpointOnBarrier(long checkpointId, 
Throwable cause) {
-   throw new UnsupportedOperationException("Should not be 
called");
--- End diff --

Why was this removed?


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245571#comment-16245571
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149949202
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 

[jira] [Commented] (FLINK-6022) Don't serialise Schema when serialising Avro GenericRecord

2017-11-09 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245577#comment-16245577
 ] 

Stephan Ewen commented on FLINK-6022:
-

We are not serializing the schema in the Avro Serializer. If the Avro 
Serializer is chosen, this is fixed.

I am wondering if the case is if one uses explicitly a "generic record" from 
Avro as the exchange data type. That is not a good idea in the first place in 
my opinion. In that case, isn't it possible that each generic record is 
different and thus you always need a schema anyways.

> Don't serialise Schema when serialising Avro GenericRecord
> --
>
> Key: FLINK-6022
> URL: https://issues.apache.org/jira/browse/FLINK-6022
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Robert Metzger
>Assignee: Stephan Ewen
> Fix For: 1.5.0
>
>
> Currently, Flink is serializing the schema for each Avro GenericRecord in the 
> stream.
> This leads to a lot of overhead over the wire/disk + high serialization costs.
> Therefore, I'm proposing to improve the support for GenericRecord in Flink by 
> shipping the schema to each serializer  through the AvroTypeInformation.
> Then, we can only support GenericRecords with the same type per stream, but 
> the performance will be much better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4777: [FLINK-7765] Enable dependency convergence

2017-11-09 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4777


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245825#comment-16245825
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149993559
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

yes, a latch that was already triggered will simply return immediately, no 
need for an additional check



> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> 

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149993559
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -277,8 +287,12 @@ public void invoke() throws Exception {
}
}
 
-   triggerLatch.trigger();
if (error != null) {
+   // exit method prematurely due to error but 
make sure that the tests can finish
+   triggerLatch.trigger();
--- End diff --

yes, a latch that was already triggered will simply return immediately, no 
need for an additional check



---


[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954576
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 ---
@@ -58,99 +59,144 @@
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.Executor;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TaskAsyncCallTest {
 
-   private static final int NUM_CALLS = 1000;
-   
+   private static int numCalls;
+
+   /** Triggered at the beginning of {@link 
CheckpointsInOrderInvokable#invoke()}. */
private static OneShotLatch awaitLatch;
+
+   /**
+* Triggered when {@link 
CheckpointsInOrderInvokable#triggerCheckpoint(CheckpointMetaData, 
CheckpointOptions)}
+* was called {@link #numCalls} times.
+*/
private static OneShotLatch triggerLatch;
 
+   private static final List classLoaders = new ArrayList<>();
+
@Before
public void createQueuesAndActors() {
+   numCalls = 1000;
+
awaitLatch = new OneShotLatch();
triggerLatch = new OneShotLatch();
+
+   classLoaders.clear();
}
 
 
// 

//  Tests 
// 

-   
+
@Test
-   public void testCheckpointCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testCheckpointCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
-   
+
awaitLatch.await();
-   
-   for (int i = 1; i <= NUM_CALLS; i++) {
+
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
}
-   
+
triggerLatch.await();
-   
+
assertFalse(task.isCanceledOrFailed());
 
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   fail("Task should be RUNNING or FINISHED, but 
is " + currentState);
-   }
-   
-   task.cancelExecution();
-   task.getExecutingThread().join();
-   }
-   catch (Exception e) {
-   e.printStackTrace();
-   fail(e.getMessage());
+   assertThat(currentState, 
isOneOf(ExecutionState.RUNNING, ExecutionState.FINISHED));
}
}
 
@Test
-   public void testMixedAsyncCallsInOrder() {
-   try {
-   Task task = createTask();
+   public void testMixedAsyncCallsInOrder() throws Exception {
+   Task task = createTask(CheckpointsInOrderInvokable.class);
+   try (TaskCleaner ignored = new TaskCleaner(task)) {
task.startTaskThread();
 
awaitLatch.await();
 
-   for (int i = 1; i <= NUM_CALLS; i++) {
+   for (int i = 1; i <= numCalls; i++) {
task.triggerCheckpointBarrier(i, 156865867234L, 
CheckpointOptions.forCheckpoint());
task.notifyCheckpointComplete(i);
}
 
triggerLatch.await();
 
assertFalse(task.isCanceledOrFailed());
+
ExecutionState currentState = task.getExecutionState();
-   if (currentState != ExecutionState.RUNNING && 
currentState != ExecutionState.FINISHED) {
-   

[GitHub] flink pull request #4980: [FLINK-8005] [runtime] Set user code class loader ...

2017-11-09 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4980#discussion_r149954606
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/DispatcherThreadFactory.java
 ---
@@ -29,21 +31,41 @@
private final ThreadGroup group;

private final String threadName;
+
+   private final ClassLoader classLoader;

/**
 * Creates a new thread factory.
-* 
+*
 * @param group The group that the threads will be associated with.
 * @param threadName The name for the threads.
 */
public DispatcherThreadFactory(ThreadGroup group, String threadName) {
+   this(group, threadName, null);
+   }
+
+   /**
+* Creates a new thread factory.
+*
+* @param group The group that the threads will be associated with.
+* @param threadName The name for the threads.
+* @param classLoader The {@link ClassLoader} to be set as context 
class loader.
+*/
+   public DispatcherThreadFactory(
+   ThreadGroup group,
--- End diff --

Indented.


---


[jira] [Assigned] (FLINK-8021) End-to-end tests may not shutdown cluster on failure

2017-11-09 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8021?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-8021:
---

Assignee: Aljoscha Krettek

> End-to-end tests may not shutdown cluster on failure
> 
>
> Key: FLINK-8021
> URL: https://issues.apache.org/jira/browse/FLINK-8021
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>
> In this job https://travis-ci.org/zentol/flink/jobs/298656917 the kafka E2E 
> test failed straight away due to a missing class. The subsequent tests failed 
> since they cannot allocate the JM port.
> It is thus likely that the E2E tests do not shutdown the cluster in all 
> failure cases.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245600#comment-16245600
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
I addressed the comments. Let's wait for Travis and let me know if 
something else needs to be changed. 

@aljoscha  @kl0u 


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 

[jira] [Commented] (FLINK-8017) High availability cluster-id option incorrect in documentation

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245599#comment-16245599
 ] 

ASF GitHub Bot commented on FLINK-8017:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4976
  
Thanks for spotting and fixing this!  

I'm merging.


> High availability cluster-id option incorrect in documentation
> --
>
> Key: FLINK-8017
> URL: https://issues.apache.org/jira/browse/FLINK-8017
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.3.2
>Reporter: Dan Kelley
>Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> The property key in HighAvailabilityOptions.java is 
> high-availability.cluster-id however the documentation states that the key is 
> high-availability.zookeeper.path.cluster-id



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4980: [FLINK-8005] [runtime] Set user code class loader before ...

2017-11-09 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/4980
  
I addressed the comments. Let's wait for Travis and let me know if 
something else needs to be changed. 

@aljoscha  @kl0u 


---


[jira] [Commented] (FLINK-8005) Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues

2017-11-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16245623#comment-16245623
 ] 

ASF GitHub Bot commented on FLINK-8005:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4980
  
I think waiting on the stop latch might not be enough (in 100 % of cases) 
because the other two calls are also asynchronous.


> Snapshotting FlinkKafkaProducer011 fails due to ClassLoader issues
> --
>
> Key: FLINK-8005
> URL: https://issues.apache.org/jira/browse/FLINK-8005
> Project: Flink
>  Issue Type: Bug
>  Components: Core, Kafka Connector, State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>
> *Problem Description*
> Classes in the user code jar cannot be loaded by the snapshot thread’s 
> context class loader ({{AppClassLoader}}).
> For example, when creating instances of {{KafkaProducer}}, Strings are 
> resolved to class objects by Kafka.
> Find below an extract from {{ConfigDef.java}}:
> {code}
> case CLASS:
> if (value instanceof Class)
> return value;
> else if (value instanceof String)
> return Class.forName(trimmed, true, 
> Utils.getContextOrKafkaClassLoader());
> else
> throw new ConfigException(name, value, "Expected a Class instance or 
> class name.");
> {code}
> *Exception/Stacktrace*
> {noformat}
> Caused by: java.lang.Exception: Could not complete snapshot 1 for operator 
> Source: Collection Source -> Sink: kafka-sink-1510048188383 (1/1).
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1077)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1026)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:659)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:595)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:526)
>   ... 7 more
> Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
> org.apache.kafka.common.serialization.ByteArraySerializer for configuration 
> key.serializer: Class 
> org.apache.kafka.common.serialization.ByteArraySerializer could not be found.
>   at 
> org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715)
>   at 
> org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460)
>   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>   at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>   at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:360)
>   at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:288)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.(FlinkKafkaProducer.java:114)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:913)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initTransactionalProducer(FlinkKafkaProducer011.java:904)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createOrGetProducerFromPool(FlinkKafkaProducer011.java:637)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:613)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:94)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:359)
>   at 
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:294)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.snapshotState(FlinkKafkaProducer011.java:756)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
>   at 
> 

  1   2   >