[GitHub] flink issue #4851: [FLINK-7623][tests] Add tests to make sure operator is ne...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4851
  
Also from our offline discussion, I would suggest that this behaves as: if 
an operator with a given ID participated in a checkpoint, it should be marked 
as restored. From this definition, all cases should be derived. I believe this 
is slightly different to the current implementation. Both make sense, so I 
think we should agree to something. @pnowojski @aljoscha which is the better 
definition for you?


---


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212183#comment-16212183
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4851
  
Also from our offline discussion, I would suggest that this behaves as: if 
an operator with a given ID participated in a checkpoint, it should be marked 
as restored. From this definition, all cases should be derived. I believe this 
is slightly different to the current implementation. Both make sense, so I 
think we should agree to something. @pnowojski @aljoscha which is the better 
definition for you?


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212175#comment-16212175
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879846
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879846
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212174#comment-16212174
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879772
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145879772
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878649
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212165#comment-16212165
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878649
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212163#comment-16212163
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878457
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878457
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212158#comment-16212158
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878181
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145878181
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877990
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212155#comment-16212155
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877990
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212153#comment-16212153
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877847
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+

[jira] [Commented] (FLINK-4866) Make Trigger.clear() Abstract to Enforce Implementation

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212152#comment-16212152
 ] 

ASF GitHub Bot commented on FLINK-4866:
---

Github user vim-wj closed the pull request at:

https://github.com/apache/flink/pull/4866


> Make Trigger.clear() Abstract to Enforce Implementation
> ---
>
> Key: FLINK-4866
> URL: https://issues.apache.org/jira/browse/FLINK-4866
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> If the method is not abstract implementors of custom triggers will not 
> realise that it could be necessary and they will likely not clean up their 
> state/timers properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4866: [FLINK-4866]SourceFunction needn't extends Seriali...

2017-10-19 Thread vim-wj
Github user vim-wj closed the pull request at:

https://github.com/apache/flink/pull/4866


---


[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877847
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/RestoreStreamTaskTest.java
 ---
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.OperatorInstanceID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests ensuring correct behaviour of {@link 
org.apache.flink.runtime.state.ManagedInitializationContext#isRestored}
+ * method.
+ */
+public class RestoreStreamTaskTest extends TestLogger {
+   @Test
+   public void testRestore() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+   AcknowledgeStreamMockEnvironment environment2 = 
processRecords(
+   new OperatorID(42L, 42L),
+   new RestoreCounterOperator(),
+   new OperatorID(44L, 44L),
+   new RestoreCounterOperator(),
+   Optional.of(stateHandles));
+
+   assertEquals(2, 
RestoreCounterOperator.RESTORE_COUNTER.get());
+   }
+   finally {
+   RestoreCounterOperator.RESTORE_COUNTER.getAndSet(0);
+   }
+   }
+
+   @Test
+   public void testRestoreWithNewId() throws Exception {
+   AcknowledgeStreamMockEnvironment environment1 = processRecords(
+   new OperatorID(42L, 42L),
+   new CounterOperator(),
+   new OperatorID(44L, 44L),
+   new CounterOperator(),
+   Optional.empty());
+
+   assertEquals(2, 
environment1.getCheckpointStateHandles().getSubtaskStateMappings().size());
+
+   TaskStateSnapshot stateHandles = 
environment1.getCheckpointStateHandles();
+
+   try {
+

[GitHub] flink pull request #4851: [FLINK-7623][tests] Add tests to make sure operato...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877337
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+
+/**
+ * Stream environment that allows to wait for checkpoint acknowledgement.
+ */
+class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
--- End diff --

I did a similar refactoring in one of my pending PRs, but it's ok because 
that one will probably not make it into 1.4. What I would still suggest, if you 
search for subclasses of `StreamMockEnvironment`, there are still more cases 
(some as anonymous classes) that could be replaced by a proper dummy like this.


---


[jira] [Commented] (FLINK-7623) Detecting whether an operator is restored doesn't work with chained state

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212145#comment-16212145
 ] 

ASF GitHub Bot commented on FLINK-7623:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4851#discussion_r145877337
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/AcknowledgeStreamMockEnvironment.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+
+/**
+ * Stream environment that allows to wait for checkpoint acknowledgement.
+ */
+class AcknowledgeStreamMockEnvironment extends StreamMockEnvironment {
--- End diff --

I did a similar refactoring in one of my pending PRs, but it's ok because 
that one will probably not make it into 1.4. What I would still suggest, if you 
search for subclasses of `StreamMockEnvironment`, there are still more cases 
(some as anonymous classes) that could be replaced by a proper dummy like this.


> Detecting whether an operator is restored doesn't work with chained state
> -
>
> Key: FLINK-7623
> URL: https://issues.apache.org/jira/browse/FLINK-7623
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: StreamingJob.java
>
>
> Originally reported on the ML: 
> https://lists.apache.org/thread.html/22a2cf83de3107aa81a03a921325a191c29df8aa8676798fcd497199@%3Cuser.flink.apache.org%3E
> If we have a chain of operators where multiple of them have operator state, 
> detection of the {{context.isRestored()}} flag (of {{CheckpointedFunction}}) 
> does not work correctly. It's best exemplified using this minimal example 
> where both the source and the flatMap have state:
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env
>   .addSource(new MaSource()).uid("source-1")
>   .flatMap(new MaFlatMap()).uid("flatMap-1");
> env.execute("testing");
> {code}
> If I do a savepoint with these UIDs, then change "source-1" to "source-2" and 
> restore from the savepoint {{context.isRestored()}} still reports {{true}} 
> for the source.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-19 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-5372.
-
Resolution: Fixed

Merged in dbf4c86

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4855: [FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest....

2017-10-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4855


---


[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212141#comment-16212141
 ] 

ASF GitHub Bot commented on FLINK-5372:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4855


> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212136#comment-16212136
 ] 

ASF GitHub Bot commented on FLINK-5372:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4855
  
Thanks for the review! Will merge this.


> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.4.0
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4855: [FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCan...

2017-10-19 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4855
  
Thanks for the review! Will merge this.


---


[GitHub] flink pull request #4866: Function extends java.io.Serializable, so SourceFu...

2017-10-19 Thread vim-wj
GitHub user vim-wj reopened a pull request:

https://github.com/apache/flink/pull/4866

Function extends java.io.Serializable, so SourceFunction needn't extends 
Serializable again

Function extends java.io.Serializable, so SourceFunction needn't extends 
Serializable again

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vim-wj/flink flink-ex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4866.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4866


commit 4eb7a9164a369cca2fe83d1235e501b07a952a88
Author: vim-wj <381025...@qq.com>
Date:   2017-10-20T03:00:07Z

SourceFunction




---


[GitHub] flink pull request #4866: SourceFunction

2017-10-19 Thread vim-wj
Github user vim-wj closed the pull request at:

https://github.com/apache/flink/pull/4866


---


[GitHub] flink issue #4866: SourceFunction

2017-10-19 Thread vim-wj
Github user vim-wj commented on the issue:

https://github.com/apache/flink/pull/4866
  
Function extends java.io.Serializable, so SourceFunction needn't extends 
Serializable again


---


[GitHub] flink pull request #4866: SourceFunction

2017-10-19 Thread vim-wj
GitHub user vim-wj opened a pull request:

https://github.com/apache/flink/pull/4866

SourceFunction

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vim-wj/flink flink-ex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4866.patch

To close this pull request, make a commit to your master/trunk branch

[GitHub] flink pull request #4865: [hotfix] Fix typos that double "the" word

2017-10-19 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4865

 [hotfix] Fix typos that double "the" word



Remove double "the" word in document and comments.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink typos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4865.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4865


commit 6975f8623e3d9c52bab7a36cd6bdad8704a1daf2
Author: yew1eb 
Date:   2017-10-20T02:45:11Z

Remove double the word




---


[jira] [Updated] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7878:

Labels: flip-6  (was: )

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7878:
---

Assignee: shuai.xu

> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-10-19 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7878:
---

 Summary: Extend the resource type user can define in ResourceSpec
 Key: FLINK-7878
 URL: https://issues.apache.org/jira/browse/FLINK-7878
 Project: Flink
  Issue Type: Bug
  Components: DataSet API, DataStream API
Reporter: shuai.xu


Now, flink only support user define how much CPU and MEM used in an operator, 
but now the resource in a cluster is various. For example, an application for 
image processing may need GPU, some others may need FPGA. 
Only CPU and MEM is not enough, and the resource type is becoming more and 
more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7669) org.apache.flink.api.common.ExecutionConfig cannot be cast to org.apache.flink.api.common.ExecutionConfig

2017-10-19 Thread Keren Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16212025#comment-16212025
 ] 

Keren Zhu commented on FLINK-7669:
--

I encountered exactly the same problem a few days ago. I later figured that, if 
I exclude both flink-core and flink-shaded-curator-recipes from building 
package (so that modules therein now depends on flink runtime rather than 
fat-jar), everything works fine. I guess a different version of curator leads 
to a different memory layout of ExecutionConfig and triggers a deserialization 
error when JobManager tries to load it.

> org.apache.flink.api.common.ExecutionConfig cannot be cast to 
> org.apache.flink.api.common.ExecutionConfig
> -
>
> Key: FLINK-7669
> URL: https://issues.apache.org/jira/browse/FLINK-7669
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.4.0
> Environment: - OS: macOS Sierra 
> - Oracle JDK 1.8
> - Scala 2.11.11
> - sbt 0.13.16
> - Build from trunk code at commit hash 
> {{42cc3a2a9c41dda7cf338db36b45131db9150674}}
> -- started a local flink node 
>Reporter: Raymond Tay
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Latest code pulled from trunk threw errors at runtime when i ran a job 
> against it; but when i ran the JAR against the stable version {{1.3.2}} it 
> was OK. Here is the stacktrace. 
> An exception is being thrown :
> {noformat}
> Cluster configuration: Standalone cluster with JobManager at 
> localhost/127.0.0.1:6123
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8081
> Starting execution of 
> programhttps://issues.apache.org/jira/issues/?jql=text%20~%20%22org.apache.flink.api.common.ExecutionConfig%20cannot%20be%20cast%20to%22#
> Submitting job with JobID: 05dd8e60c6fda3b96fc22ef6cf389a23. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-234825544] with leader 
> session id ----.
> 
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Failed to submit job 05dd8e60c6fda3b96fc22ef6cf389a23 
> (Flink Streaming Job)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:479)
>   at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:443)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1501)
>   at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:629)
>   at 
> org.example.streams.split.SimpleSplitStreams$.main(04splitstreams.scala:53)
>   at 
> org.example.streams.split.SimpleSplitStreams.main(04splitstreams.scala)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:383)
>   at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:840)
>   at org.apache.flink.client.CliFrontend.run(CliFrontend.java:285)
>   at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1088)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1135)
>   at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1132)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:44)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1132)
> Caused by: 

[jira] [Updated] (FLINK-7865) Remove predicate restrictions on TableFunction left outer join

2017-10-19 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-7865:
---
Description: 
To cover up the improper translation of lateral table left outer join 
(CALCITE-2004), we have temporarily forbidden the predicates (except {{true}} 
literal) in Table API (FLINK-7853) and SQL (FLINK-7854). Once the issue has 
been fixed in Calcite, we should remove the restrictions. The tasks may include 
removing Table API/SQL condition check, removing validation tests, enabling 
integration tests, updating the documents, etc.

See [this thread on Calcite dev 
list|https://lists.apache.org/thread.html/16caeb8b1649c4da85f9915ea723c6c5b3ced0b96914cadc24ee4e15@%3Cdev.calcite.apache.org%3E]
 for more information.

  was:
To cover up the improper translation of lateral table left outer join 
(CALCITE-2004), we have temporarily forbidden the predicates (except {{true}} 
literal) in Table API (FLINK-7853) and SQL (FLINK-7854). Once the issue has 
been fixed in Calcite, we should remove the restrictions. The tasks may include 
removing Table API/SQL condition check, remove validation tests, enable 
integration tests, update the documents, etc.

See [this thread on Calcite dev 
list|https://lists.apache.org/thread.html/16caeb8b1649c4da85f9915ea723c6c5b3ced0b96914cadc24ee4e15@%3Cdev.calcite.apache.org%3E]
 for more information.


> Remove predicate restrictions on TableFunction left outer join
> --
>
> Key: FLINK-7865
> URL: https://issues.apache.org/jira/browse/FLINK-7865
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>
> To cover up the improper translation of lateral table left outer join 
> (CALCITE-2004), we have temporarily forbidden the predicates (except {{true}} 
> literal) in Table API (FLINK-7853) and SQL (FLINK-7854). Once the issue has 
> been fixed in Calcite, we should remove the restrictions. The tasks may 
> include removing Table API/SQL condition check, removing validation tests, 
> enabling integration tests, updating the documents, etc.
> See [this thread on Calcite dev 
> list|https://lists.apache.org/thread.html/16caeb8b1649c4da85f9915ea723c6c5b3ced0b96914cadc24ee4e15@%3Cdev.calcite.apache.org%3E]
>  for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4794: [build][minor] Add missing licenses

2017-10-19 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4794
  
We already have a script, it is the RAT plugin: 
https://github.com/apache/flink/blob/master/pom.xml#L957

You only need to make sure that these files are not excluded from the 
check...


---


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-19 Thread Ryan Hobbs (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211807#comment-16211807
 ] 

Ryan Hobbs commented on FLINK-7737:
---

I believe the SYNC_BLOCK flag is sufficient.  My understanding (I could be 
incorrect) is that if the flag `SYNC_BLOCK` is passed in during create, then 
upon _hflush()_ if that flag was used during create it will perform _hsync()_.  
I believe https://github.com/apache/flink/pull/4781 may be an alternate 
solution to #2 cited above by Stephen.  We are in the process of looking into 
that now.

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7621) Fix Inconsistency of CaseSensitive Configuration

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7621:
-
Issue Type: Improvement  (was: Bug)

> Fix Inconsistency of CaseSensitive Configuration
> 
>
> Key: FLINK-7621
> URL: https://issues.apache.org/jira/browse/FLINK-7621
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The default case sensitive config of Calcite is  {{LEX.java}} which is 
> different from TableAPI



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7618) Add BINARY supported in FlinkTypeFactory

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7618:
-
Issue Type: New Feature  (was: Bug)

> Add BINARY supported in FlinkTypeFactory
> 
>
> Key: FLINK-7618
> URL: https://issues.apache.org/jira/browse/FLINK-7618
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>
> We will get the following exception when we deal with the BINARY type.
> {code}
> org.apache.flink.table.api.TableException: Type is not supported: BINARY
>   at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
>   at 
> org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:377)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:741)
>   at 
> org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:67)
>   at org.apache.calcite.rex.RexCall.accept(RexCall.java:104)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:754)
>   at 
> org.apache.flink.table.codegen.CodeGenerator$$anonfun$16.apply(CodeGenerator.scala:744)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7730) TableFunction LEFT OUTER joins with ON predicates are broken

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7730.

   Resolution: Fixed
Fix Version/s: 1.3.0
   1.4.0

Fixed by rejecting queries with predicates in left outer joins against a table 
function (see sub-issues)

> TableFunction LEFT OUTER joins with ON predicates are broken
> 
>
> Key: FLINK-7730
> URL: https://issues.apache.org/jira/browse/FLINK-7730
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Critical
> Fix For: 1.4.0, 1.3.0
>
>
> TableFunction left outer joins with predicates in the ON clause are broken. 
> Apparently, the are no tests for this and it has never worked. I observed 
> issues on several layers:
> - Table Function does not correctly validate equality predicate: 
> {{leftOuterJoin(func1('c) as 'd,  'a.cast(Types.STRING) === 'd)}} is rejected 
> because the predicate is not considered as an equality predicate (the cast 
> needs to be pushed down).
> - Plans cannot be correctly translated: {{leftOuterJoin(func1('c) as 'd,  'c 
> === 'd)}} gives an optimizer exception.
> - SQL queries get translated but produce incorrect results. For example 
> {{SELECT a, b, c, d FROM MyTable LEFT OUTER JOIN LATERAL TABLE(tfunc(c)) AS 
> T(d) ON d = c}} returns an empty result if the condition {{d = c}} never 
> returns true. However, the outer side should be preserved and padded with 
> nulls.
> So there seem to be many issues with table function outer joins. Especially, 
> the wrong result produced by SQL queries need to be quickly fixed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-19 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211609#comment-16211609
 ] 

Piotr Nowojski edited comment on FLINK-7737 at 10/19/17 7:38 PM:
-

I have started looking into this. Are you sure that setting {{SYNC_BLOCK}} is 
good enough? According to what I was able to find, it only tells to sync the 
data on the last packet for given block and seems to have nothing to do with 
{{hflush()}}. Unless I'm missing the point that we start a new data block per 
each flush?

Regardless of that, I think that we cannot always {{hsync()}} the data because 
of performance reasons (different semantics on different systems), so I would 
lean toward the 4. option proposed by [~StephanEwen]. However the abstraction 
with {{FileSystemFactory}} is a little bit confusing to me, because it 
works/allows to plugin {{FileSystem}} defined in {{flink-core}} module, but 
{{BucketingSink}} and {{StreamWriterBase}} are using {{FileSystem}} and 
{{FSDataOutputStream}} defined in {{hadoop-commons}}, which confuses me... 
[~ryanehobbs] am I missing something, or is this PR 
https://github.com/apache/flink/pull/4781 kind of irrelevant to this issue?


was (Author: pnowojski):
I have started looking into this. Are you sure that setting {{SYNC_BLOCK}} is 
good enough? According to what I was able to find, it only tells to sync the 
data on the last packet for given block and seems to have nothing to do with 
{{hflush()}}. Unless I'm missing the point that we start a new data block per 
each flush?

Regardless of that, I think that we cannot always {{hsync()}} the data before 
of performance reasons (different semantics on different systems), so I would 
lean toward the 4. option proposed by [~StephanEwen]. However the abstraction 
with {{FileSystemFactory}} is a little bit confusing to me, because it 
works/allows to plugin {{FileSystem}} defined in {{flink-core}} module, but 
{{BucketingSink}} and {{StreamWriterBase}} are using {{FileSystem}} and 
{{FSDataOutputStream}} defined in {{hadoop-commons}}, which confuses me... 
[~ryanehobbs] am I missing something, or is this PR 
https://github.com/apache/flink/pull/4781 kind of irrelevant to this issue?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-19 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211609#comment-16211609
 ] 

Piotr Nowojski commented on FLINK-7737:
---

I have started looking into this. Are you sure that setting {{SYNC_BLOCK}} is 
good enough? According to what I was able to find, it only tells to sync the 
data on the last packet for given block and seems to have nothing to do with 
{{hflush()}}. Unless I'm missing the point that we start a new data block per 
each flush?

Regardless of that, I think that we cannot always {{hsync()}} the data before 
of performance reasons (different semantics on different systems), so I would 
lean toward the 4. option proposed by [~StephanEwen]. However the abstraction 
with {{FileSystemFactory}} is a little bit confusing to me, because it 
works/allows to plugin {{FileSystem}} defined in {{flink-core}} module, but 
{{BucketingSink}} and {{StreamWriterBase}} are using {{FileSystem}} and 
{{FSDataOutputStream}} defined in {{hadoop-commons}}, which confuses me... 
[~ryanehobbs] am I missing something, or is this PR 
https://github.com/apache/flink/pull/4781 kind of irrelevant to this issue?

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7865) Remove predicate restrictions on TableFunction left outer join

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7865:
-
Issue Type: New Feature  (was: Sub-task)
Parent: (was: FLINK-7730)

> Remove predicate restrictions on TableFunction left outer join
> --
>
> Key: FLINK-7865
> URL: https://issues.apache.org/jira/browse/FLINK-7865
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>
> To cover up the improper translation of lateral table left outer join 
> (CALCITE-2004), we have temporarily forbidden the predicates (except {{true}} 
> literal) in Table API (FLINK-7853) and SQL (FLINK-7854). Once the issue has 
> been fixed in Calcite, we should remove the restrictions. The tasks may 
> include removing Table API/SQL condition check, remove validation tests, 
> enable integration tests, update the documents, etc.
> See [this thread on Calcite dev 
> list|https://lists.apache.org/thread.html/16caeb8b1649c4da85f9915ea723c6c5b3ced0b96914cadc24ee4e15@%3Cdev.calcite.apache.org%3E]
>  for more information.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5498) Add support for left/right outer joins with non-equality predicates (and 1+ equality predicates)

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211592#comment-16211592
 ] 

ASF GitHub Bot commented on FLINK-5498:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3379
  
Hi @lincoln-lil, I fixed FLINK-5498 with PR #4858 as a by product to fix 
FLINK-7755.
The implementation of #4858 is memory safe (based on the proposal we 
discussed in FLINK-5498).

Could you please close this PR? 
I'd also appreciate a review for #4858 if you have some time.

Thank you, Fabian


> Add support for left/right outer joins with non-equality predicates (and 1+ 
> equality predicates)
> 
>
> Key: FLINK-5498
> URL: https://issues.apache.org/jira/browse/FLINK-5498
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Affects Versions: 1.3.0
>Reporter: lincoln.lee
>Assignee: Fabian Hueske
>Priority: Minor
>
> I found the expected result of a unit test case incorrect compare to that in 
> a RDMBS, 
> see 
> flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
> {code:title=JoinITCase.scala}
> def testRightJoinWithNotOnlyEquiJoin(): Unit = {
>  ...
>  val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 
> 'c)
>  val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 
> 'f, 'g, 'h)
>  val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
>  
>  val expected = "Hello world,BCD\n"
>  val results = joinT.toDataSet[Row].collect()
>  TestBaseUtils.compareResultAsText(results.asJava, expected)
> }
> {code}
> Then I took some time to learn about the ‘outer join’ in relational 
> databases, the right result of above case should be(tested in SQL Server and 
> MySQL, the results are same):
> {code}
> > select c, g from tuple3 right outer join tuple5 on a=f and b cg   
>  
> NULL Hallo   
> NULL Hallo Welt  
> NULL Hallo Welt wie  
> NULL Hallo Welt wie gehts?   
> NULL ABC 
> Hello world  BCD 
> NULL CDE 
> NULL DEF 
> NULL EFG 
> NULL FGH 
> NULL GHI 
> NULL HIJ 
> NULL IJK 
> NULL JKL 
> NULL KLM   
> {code}
> the join condition {{rightOuterJoin('a === 'd && 'b < 'h)}} is not equivalent 
> to {{rightOuterJoin('a === 'd).where('b < 'h)}}.  
> The problem is rooted in the code-generated {{JoinFunction}} (see 
> {{DataSetJoin.translateToPlan()}}, line 188). If the join condition does not 
> match, we must emit the outer row padded with nulls instead of returning from 
> the function without emitting anything.
> The code-generated {{JoinFunction}} does also include equality predicates. 
> These should be removed before generating the code, e.g., in 
> {{DataSetJoinRule}} when generating the {{DataSetJoin}} with help of 
> {{JoinInfo.getRemaining()}}.
> More details: https://goo.gl/ngekca



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3379: [FLINK-5498] [table] Add support for left/right outer joi...

2017-10-19 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3379
  
Hi @lincoln-lil, I fixed FLINK-5498 with PR #4858 as a by product to fix 
FLINK-7755.
The implementation of #4858 is memory safe (based on the proposal we 
discussed in FLINK-5498).

Could you please close this PR? 
I'd also appreciate a review for #4858 if you have some time.

Thank you, Fabian


---


[jira] [Closed] (FLINK-7854) Reject lateral table outer joins with predicates in SQL

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7854.

   Resolution: Fixed
Fix Version/s: 1.3.3

Fixed for 1.3.3 with 1d10cee91a12734de09b8c9665437309242e2dd5
Fixed for 1.4.0 with 479be9d888f9a338cac908d50eb753843f2aad4b

> Reject lateral table outer joins with predicates in SQL
> ---
>
> Key: FLINK-7854
> URL: https://issues.apache.org/jira/browse/FLINK-7854
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Due to CALCITE-2004, lateral table outer joins can not be normally executed. 
> We should cover it up by rejecting join predicates temporarily, until the 
> issue is fixed in Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-19 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-7737:
-

Assignee: Piotr Nowojski

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211517#comment-16211517
 ] 

ASF GitHub Bot commented on FLINK-6225:
---

Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@zentol thx, fixed.


> Support Row Stream for CassandraSink
> 
>
> Key: FLINK-6225
> URL: https://issues.apache.org/jira/browse/FLINK-6225
> Project: Flink
>  Issue Type: New Feature
>  Components: Cassandra Connector
>Affects Versions: 1.3.0
>Reporter: Jing Fan
>Assignee: Haohui Mai
> Fix For: 1.4.0
>
>
> Currently in CassandraSink, specifying query is not supported for row-stream. 
> The solution should be similar to CassandraTupleSink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3748: [FLINK-6225] [Cassandra Connector] add CassandraTableSink

2017-10-19 Thread PangZhi
Github user PangZhi commented on the issue:

https://github.com/apache/flink/pull/3748
  
@zentol thx, fixed.


---


[GitHub] flink pull request #4846: [FLINK-7854] [table] Reject lateral table outer jo...

2017-10-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4846


---


[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211369#comment-16211369
 ] 

ASF GitHub Bot commented on FLINK-7854:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4846


> Reject lateral table outer joins with predicates in SQL
> ---
>
> Key: FLINK-7854
> URL: https://issues.apache.org/jira/browse/FLINK-7854
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Due to CALCITE-2004, lateral table outer joins can not be normally executed. 
> We should cover it up by rejecting join predicates temporarily, until the 
> issue is fixed in Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7877) Fix compilation against the Hadoop 3 beta1 release

2017-10-19 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7877:
-

 Summary: Fix compilation against the Hadoop 3 beta1 release
 Key: FLINK-7877
 URL: https://issues.apache.org/jira/browse/FLINK-7877
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


When compiling against hadoop 3.0.0-beta1, I got:
{code}
[ERROR] 
/mnt/disk2/a/flink/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java:[224,16]
 org.apache.flink.yarn.UtilsTest.TestingContainer is not abstract and does not 
override abstract method 
setExecutionType(org.apache.hadoop.yarn.api.records.ExecutionType) in 
org.apache.hadoop.yarn.api.records.Container
{code}
There may other hadoop API(s) that need adjustment.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7864) Support side-outputs in CoProcessFunction

2017-10-19 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211343#comment-16211343
 ] 

Bowen Li commented on FLINK-7864:
-

With the latest master branch, I found I cannot run {{SideOutputITCase.java}} 
anymore from both console and intellij. Here's the error stack trace in 
intelliJ:

{code:java}
/Library/Java/JavaVirtualMachines/jdk1.8.0_111.jdk/Contents/Home/bin/java -ea 
-Dlog.level=WARN -DforkNumber=01 -Dlog4j.configuration=log4j-test.properties 
-Xms256m -Xmx2048m -Dmvn.forkNumber=1 -XX:+UseSerialGC 
-Didea.launcher.port=7532 "-Didea.launcher.bin.path=/Applications/IntelliJ 
IDEA.app/Contents/bin" -classpath 
/private/var/folders/84/d9dx_03d7j13r7r51z7l37dcgp/T/classpath.jar 
-Dfile.encoding=UTF-8 com.intellij.rt.execution.application.AppMain 
com.intellij.rt.execution.junit.JUnitStarter -ideVersion5 
org.apache.flink.test.streaming.runtime.SideOutputITCase
Uncaught error from thread [flink-akka.actor.default-dispatcher-4] shutting 
down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
java.lang.NoSuchMethodError: 
akka.pattern.AskableActorRef$.$qmark$default$3$extension(Lakka/actor/ActorRef;Ljava/lang/Object;)Lakka/actor/ActorRef;
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:394)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Process finished with exit code 255
{code}


[~aljoscha] Have you run into it?

> Support side-outputs in CoProcessFunction
> -
>
> Key: FLINK-7864
> URL: https://issues.apache.org/jira/browse/FLINK-7864
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> We forgot to add support for side-outputs when we added that to 
> {{ProcessFunction}}. Should be as easy as adding it to the {{Context}} and 
> wiring it in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211302#comment-16211302
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145752342
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

I think it is not a bug. Considering the similar logic is current 
`PartitionRequestQueue`, if the channel becomes writable via 
`channelWritabilityChanged` , it will call `writeAndFlushNextMessageIfPossible` 
again.

`notifyCreditAvailable` just triggers enqueue the `InputChannel` in the 
pipeline. And if the channel is not writable currently, this `InputChannel` 
will still keep in the queue. Until the channel becomes writable, the 
`InputChannel` will be polled from the queue to send credit to the producer.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145752342
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

I think it is not a bug. Considering the similar logic is current 
`PartitionRequestQueue`, if the channel becomes writable via 
`channelWritabilityChanged` , it will call `writeAndFlushNextMessageIfPossible` 
again.

`notifyCreditAvailable` just triggers enqueue the `InputChannel` in the 
pipeline. And if the channel is not writable currently, this `InputChannel` 
will still keep in the queue. Until the channel becomes writable, the 
`InputChannel` will be polled from the queue to send credit to the producer.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211297#comment-16211297
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145750939
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

In this PR I add the new `inputChannelsWithCredit` queue and comment it. In 
order to contrast 
with previous `inputChannels`, I also add comment for it.  As you said, it 
can also be done in last commit.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145750939
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

In this PR I add the new `inputChannelsWithCredit` queue and comment it. In 
order to contrast 
with previous `inputChannels`, I also add comment for it.  As you said, it 
can also be done in last commit.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211293#comment-16211293
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145750093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
--- End diff --

The write and flush logic is added in this PR, so I also make these 
comments in this PR to cover full messages. Actually it also can be done in 
last commit as you said.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145750093
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
--- End diff --

The write and flush logic is added in this PR, so I also make these 
comments in this PR to cover full messages. Actually it also can be done in 
last commit as you said.


---


[jira] [Commented] (FLINK-7697) Add metrics for Elasticsearch Sink

2017-10-19 Thread Sendoh (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211289#comment-16211289
 ] 

Sendoh commented on FLINK-7697:
---

Would be very nice to contain the following information so we know the latency 
between Elasticsearch Sink and Elasticsearch

private final LongMinimum firstOccurredAtStore = new LongMinimum();
private final LongMaximum lastOccurredAtStore = new LongMaximum();

> Add metrics for Elasticsearch Sink
> --
>
> Key: FLINK-7697
> URL: https://issues.apache.org/jira/browse/FLINK-7697
> Project: Flink
>  Issue Type: Wish
>  Components: ElasticSearch Connector
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Critical
>
> We should add metrics  to track  events write to ElasticasearchSink.
> eg. 
> * number of successful bulk sends
> * number of documents inserted
> * number of documents updated
> * number of documents version conflicts



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211288#comment-16211288
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145749291
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

I reviewed all the previous existing tests in 
`PartitionRequestClientHandlerTest` and all the `RemoteInputChannel` is mocked, 
so I also used the mocked channel here.

I think the mocked input channel is enough for this test, because there is 
no interaction with internal input channel. The handler just enqueues this 
input channel and pools it to send `AddCredit` message.

If you prefer the real input channel, I am willing to modify it.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145749291
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

I reviewed all the previous existing tests in 
`PartitionRequestClientHandlerTest` and all the `RemoteInputChannel` is mocked, 
so I also used the mocked channel here.

I think the mocked input channel is enough for this test, because there is 
no interaction with internal input channel. The handler just enqueues this 
input channel and pools it to send `AddCredit` message.

If you prefer the real input channel, I am willing to modify it.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211272#comment-16211272
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145746120
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -304,12 +304,21 @@ public void testProducerFailedException() throws 
Exception {
 
/**
 * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* recycled to available buffers directly and it triggers notification 
of unannounced credit.
 */
@Test
public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
+   // Setup
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate, connClient, backoff));
--- End diff --

Your code may be out of date. The current code has not this issue.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211274#comment-16211274
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145746280
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -378,32 +387,47 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
 
/**
 * Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
-* floating buffers once receiving the producer's backlog.
+* floating buffers once receiving the producer's backlog, and then 
notifies credit available after
+* receiving floating buffers.
 */
@Test
public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
// Setup
final BufferPool bufferPool = mock(BufferPool.class);

when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
 
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
--- End diff --

refresh the latest code


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145746280
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -378,32 +387,47 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
 
/**
 * Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
-* floating buffers once receiving the producer's backlog.
+* floating buffers once receiving the producer's backlog, and then 
notifies credit available after
+* receiving floating buffers.
 */
@Test
public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
// Setup
final BufferPool bufferPool = mock(BufferPool.class);

when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
 
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
--- End diff --

refresh the latest code


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145746120
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -304,12 +304,21 @@ public void testProducerFailedException() throws 
Exception {
 
/**
 * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* recycled to available buffers directly and it triggers notification 
of unannounced credit.
 */
@Test
public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
+   // Setup
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate, connClient, backoff));
--- End diff --

Your code may be out of date. The current code has not this issue.


---


[jira] [Closed] (FLINK-7802) Exception occur when empty field collection was pushed into CSVTableSource

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7802.

Resolution: Fixed

Fixed for 1.3.3 with 024d8f5779406d88f44fda51aa47c2bdbc63a226
Fixed for 1.4.0 with 808e0f9a6d258fb404f81a2144c93719732802f0

> Exception occur when empty field collection was pushed into CSVTableSource
> --
>
> Key: FLINK-7802
> URL: https://issues.apache.org/jira/browse/FLINK-7802
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> Consider such SQL: select count(1) from csv_table. 
> When above SQL was executed, an exception will occur:
> java.lang.IllegalArgumentException: At least one field must be specified
> at 
> org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:50)
> So if no fields will be used, we should also keep some columns for 
> CSVTableSource to get row count.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7759) Fix Bug that fieldName with Boolean prefix can't be parsed by ExpressionParser.

2017-10-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-7759.

   Resolution: Fixed
Fix Version/s: 1.3.3

Fixed for 1.3.3 with 28b65f9a779bb76e603e2951896fefe2d6437b2d
Fixed for 1.4.0 with 95f863052eeb4e6fedffa24ddd0ef4679f8be5ce

> Fix Bug that fieldName with Boolean prefix can't be parsed by 
> ExpressionParser.
> ---
>
> Key: FLINK-7759
> URL: https://issues.apache.org/jira/browse/FLINK-7759
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> just call {{ExpressionParser.parseExpression}} with a prefix equals "true" or 
> "false"
> {{ExpressionParser.parseExpression("true_target")}} or 
> {{ExpressionParser.parseExpression("falsex")}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4846: [FLINK-7854] [table] Reject lateral table outer joins wit...

2017-10-19 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4846
  
Great thanks a lot! The PR looks good. Will merge it.


---


[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211195#comment-16211195
 ] 

ASF GitHub Bot commented on FLINK-7854:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/4846
  
Great thanks a lot! The PR looks good. Will merge it.


> Reject lateral table outer joins with predicates in SQL
> ---
>
> Key: FLINK-7854
> URL: https://issues.apache.org/jira/browse/FLINK-7854
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Due to CALCITE-2004, lateral table outer joins can not be normally executed. 
> We should cover it up by rejecting join predicates temporarily, until the 
> issue is fixed in Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5968) Document WindowedStream.aggregate()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211186#comment-16211186
 ] 

ASF GitHub Bot commented on FLINK-5968:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145726822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -52,7 +52,7 @@ import 
org.apache.flink.table.dataview.MapViewTypeInfoFactory
   *  return accum;
   *}
   *
-  *public void accumulate(MyAccum accumulator, String id) {
+  *public MyAccum accumulate(MyAccum accumulator, String id) {
--- End diff --

Please revert these changes.
The `accumulate` method belongs to a Table API 
`org.apache.flink.table.functions.AggregateFunction` and not 
`GeneratedAggregations`.


> Document WindowedStream.aggregate()
> ---
>
> Key: FLINK-5968
> URL: https://issues.apache.org/jira/browse/FLINK-5968
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5968) Document WindowedStream.aggregate()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211189#comment-16211189
 ] 

ASF GitHub Bot commented on FLINK-5968:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145726800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -52,11 +52,12 @@ import 
org.apache.flink.table.dataview.ListViewTypeInfoFactory
   * return accum;
   *   }
   *
-  *   public void accumulate(MyAccum accumulator, String id) {
+  *   public MyAccum accumulate(MyAccum accumulator, String id) {
--- End diff --

Please revert these changes.
The `accumulate` method belongs to a Table API 
`org.apache.flink.table.functions.AggregateFunction` and not 
`GeneratedAggregations`.



> Document WindowedStream.aggregate()
> ---
>
> Key: FLINK-5968
> URL: https://issues.apache.org/jira/browse/FLINK-5968
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5968) Document WindowedStream.aggregate()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211187#comment-16211187
 ] 

ASF GitHub Bot commented on FLINK-5968:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145728875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -67,7 +67,7 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 * @param inputinput values bundled in a row
--- End diff --

Please add `@returns` documentation


> Document WindowedStream.aggregate()
> ---
>
> Key: FLINK-5968
> URL: https://issues.apache.org/jira/browse/FLINK-5968
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5968) Document WindowedStream.aggregate()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211188#comment-16211188
 ] 

ASF GitHub Bot commented on FLINK-5968:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145731146
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -67,7 +67,7 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 * @param inputinput values bundled in a row
--- End diff --

Please change `retract()` accordingly to keep the interfaces consistent.

Alternatively, you can also revert the changes on `GeneratedAggregations` 
which is only an internal interface. The Table API 
`org.apache.flink.table.functions.AggregateFunction` does not support immutable 
accumulators, so this change has no effect until the public interface is 
changed. If you revert, we only need to touch `AggregateAggFunction` and not 
the code generator.


> Document WindowedStream.aggregate()
> ---
>
> Key: FLINK-5968
> URL: https://issues.apache.org/jira/browse/FLINK-5968
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....

2017-10-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145726822
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/MapView.scala
 ---
@@ -52,7 +52,7 @@ import 
org.apache.flink.table.dataview.MapViewTypeInfoFactory
   *  return accum;
   *}
   *
-  *public void accumulate(MyAccum accumulator, String id) {
+  *public MyAccum accumulate(MyAccum accumulator, String id) {
--- End diff --

Please revert these changes.
The `accumulate` method belongs to a Table API 
`org.apache.flink.table.functions.AggregateFunction` and not 
`GeneratedAggregations`.


---


[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....

2017-10-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145726800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/dataview/ListView.scala
 ---
@@ -52,11 +52,12 @@ import 
org.apache.flink.table.dataview.ListViewTypeInfoFactory
   * return accum;
   *   }
   *
-  *   public void accumulate(MyAccum accumulator, String id) {
+  *   public MyAccum accumulate(MyAccum accumulator, String id) {
--- End diff --

Please revert these changes.
The `accumulate` method belongs to a Table API 
`org.apache.flink.table.functions.AggregateFunction` and not 
`GeneratedAggregations`.



---


[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....

2017-10-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145728875
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -67,7 +67,7 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 * @param inputinput values bundled in a row
--- End diff --

Please add `@returns` documentation


---


[GitHub] flink pull request #4833: [FLINK-5968] Add documentation for WindowedStream....

2017-10-19 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4833#discussion_r145731146
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GeneratedAggregations.scala
 ---
@@ -67,7 +67,7 @@ abstract class GeneratedAggregations extends Function {
 * aggregated results
 * @param inputinput values bundled in a row
--- End diff --

Please change `retract()` accordingly to keep the interfaces consistent.

Alternatively, you can also revert the changes on `GeneratedAggregations` 
which is only an internal interface. The Table API 
`org.apache.flink.table.functions.AggregateFunction` does not support immutable 
accumulators, so this change has no effect until the public interface is 
changed. If you revert, we only need to touch `AggregateAggFunction` and not 
the code generator.


---


[GitHub] flink issue #4846: [FLINK-7854] [table] Reject lateral table outer joins wit...

2017-10-19 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4846
  
Hi @fhueske, the PR has been updated.


---


[jira] [Commented] (FLINK-7854) Reject lateral table outer joins with predicates in SQL

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211176#comment-16211176
 ] 

ASF GitHub Bot commented on FLINK-7854:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4846
  
Hi @fhueske, the PR has been updated.


> Reject lateral table outer joins with predicates in SQL
> ---
>
> Key: FLINK-7854
> URL: https://issues.apache.org/jira/browse/FLINK-7854
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Due to CALCITE-2004, lateral table outer joins can not be normally executed. 
> We should cover it up by rejecting join predicates temporarily, until the 
> issue is fixed in Calcite.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7864) Support side-outputs in CoProcessFunction

2017-10-19 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211166#comment-16211166
 ] 

Bowen Li commented on FLINK-7864:
-

[~aljoscha] yes, I'll finish it before then

> Support side-outputs in CoProcessFunction
> -
>
> Key: FLINK-7864
> URL: https://issues.apache.org/jira/browse/FLINK-7864
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Aljoscha Krettek
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> We forgot to add support for side-outputs when we added that to 
> {{ProcessFunction}}. Should be as easy as adding it to the {{Context}} and 
> wiring it in.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-5968) Document WindowedStream.aggregate()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211145#comment-16211145
 ] 

ASF GitHub Bot commented on FLINK-5968:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4833
  
CC: @fhueske You wanted me to cc you for the Table API changes.


> Document WindowedStream.aggregate()
> ---
>
> Key: FLINK-5968
> URL: https://issues.apache.org/jira/browse/FLINK-5968
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API, Documentation
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4833: [FLINK-5968] Add documentation for WindowedStream.aggrega...

2017-10-19 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4833
  
CC: @fhueske You wanted me to cc you for the Table API changes.


---


[jira] [Commented] (FLINK-7783) Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211142#comment-16211142
 ] 

ASF GitHub Bot commented on FLINK-7783:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4863
  
@StefanRRichter I'm not sure whether my concern about incremental state 
handles is actually accurate.


> Don't always remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
> --
>
> Key: FLINK-7783
> URL: https://issues.apache.org/jira/browse/FLINK-7783
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, we always delete checkpoint handles if they (or the data from the 
> DFS) cannot be read: 
> https://github.com/apache/flink/blob/91a4b276171afb760bfff9ccf30593e648e91dfb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java#L180
> This can lead to problems in case the DFS is temporarily not available, i.e. 
> we could inadvertently
> delete all checkpoints even though they are still valid.
> A user reported this problem on the mailing list: 
> https://lists.apache.org/thread.html/9dc9b719cf8449067ad01114fedb75d1beac7b4dff171acdcc24903d@%3Cuser.flink.apache.org%3E



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4863: [FLINK-7783] Don't always remove checkpoints in ZooKeeper...

2017-10-19 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4863
  
@StefanRRichter I'm not sure whether my concern about incremental state 
handles is actually accurate.


---


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2017-10-19 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211139#comment-16211139
 ] 

Ted Yu commented on FLINK-5486:
---

The reason for additional synchronization is in description.

This was discovered during code review.

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
> Fix For: 1.3.3
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4815: [FLINK-7802] [hotfix] Exception occur when empty f...

2017-10-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4815


---


[jira] [Commented] (FLINK-7759) Fix Bug that fieldName with Boolean prefix can't be parsed by ExpressionParser.

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211137#comment-16211137
 ] 

ASF GitHub Bot commented on FLINK-7759:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4829


> Fix Bug that fieldName with Boolean prefix can't be parsed by 
> ExpressionParser.
> ---
>
> Key: FLINK-7759
> URL: https://issues.apache.org/jira/browse/FLINK-7759
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
> Fix For: 1.4.0
>
>
> just call {{ExpressionParser.parseExpression}} with a prefix equals "true" or 
> "false"
> {{ExpressionParser.parseExpression("true_target")}} or 
> {{ExpressionParser.parseExpression("falsex")}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7802) Exception occur when empty field collection was pushed into CSVTableSource

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211138#comment-16211138
 ] 

ASF GitHub Bot commented on FLINK-7802:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4815


> Exception occur when empty field collection was pushed into CSVTableSource
> --
>
> Key: FLINK-7802
> URL: https://issues.apache.org/jira/browse/FLINK-7802
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: godfrey he
>Assignee: godfrey he
>Priority: Critical
> Fix For: 1.4.0, 1.3.3
>
>
> Consider such SQL: select count(1) from csv_table. 
> When above SQL was executed, an exception will occur:
> java.lang.IllegalArgumentException: At least one field must be specified
> at 
> org.apache.flink.api.java.io.RowCsvInputFormat.(RowCsvInputFormat.java:50)
> So if no fields will be used, we should also keep some columns for 
> CSVTableSource to get row count.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4829: [FLINK-7759][TableAPI & SQL] Fix Bug that fieldNam...

2017-10-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4829


---


[GitHub] flink pull request #4864: [hotfix] fix typos in comments in RuntimeContext.j...

2017-10-19 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/4864

[hotfix] fix typos in comments in RuntimeContext.java

## What is the purpose of the change

hotfix for comments in RuntimeContext.java

## Brief change log


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

none

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4864.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4864


commit 8713af3e91510dff134b84dbafa129263cbe1440
Author: Bowen Li 
Date:   2017-10-19T14:24:40Z

[hotfix] fix typos in comments




---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145692507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

`return`?

If this is indeed a bug, please add tests that cover it.  Because it seems 
that we would have now resource leak if we would trigger 
`notifyCreditAvailable` multiple times while `inputChannel` is not writable.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211032#comment-16211032
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145692507
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -269,4 +315,49 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null || !channel.isWritable()) {
+   return;
+   }
+
+   while (true) {
+   RemoteInputChannel inputChannel = 
inputChannelsWithCredit.poll();
+
+   // The input channel may be null because of the write 
callbacks that are executed
+   // after each write, and it is also no need to notify 
credit for released channel.
+   if (inputChannel == null || inputChannel.isReleased()) {
+   return;
+   }
+
+   AddCredit msg = new AddCredit(
+   inputChannel.getPartitionId(),
+   inputChannel.getAndResetCredit(),
+   inputChannel.getInputChannelId());
+
+   // Write and flush and wait until this is done before
+   // trying to continue with the next input channel.
+   channel.writeAndFlush(msg).addListener(writeListener);
+
+   return;
--- End diff --

`return`?

If this is indeed a bug, please add tests that cover it.  Because it seems 
that we would have now resource leak if we would trigger 
`notifyCreditAvailable` multiple times while `inputChannel` is not writable.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145697924
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
--- End diff --

was this addressed?


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211031#comment-16211031
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145697848
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
+
+   // Enqueue the input channel
+   handler.notifyCreditAvailable(inputChannel);
+
+   channel.runPendingTasks();
+
+   // Read the enqueued msg
+   Object msg1 = channel.readOutbound();
+
+   // Should notify credit
+   assertEquals(msg1.getClass(), AddCredit.class);
+   }
+
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
+* message is not sent actually after this input channel is released.
+*/
+   @Test
+   public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

ditto


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16211033#comment-16211033
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144244479
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -304,12 +304,21 @@ public void testProducerFailedException() throws 
Exception {
 
/**
 * Tests {@link RemoteInputChannel#recycle(MemorySegment)}, verifying 
the exclusive segment is
-* recycled to available buffers directly and it triggers notify of 
announced credit.
+* recycled to available buffers directly and it triggers notification 
of unannounced credit.
 */
@Test
public void testRecycleExclusiveBufferBeforeReleased() throws Exception 
{
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
+   // Setup
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
final SingleInputGate inputGate = mock(SingleInputGate.class);
-   final RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate));
+
+   RemoteInputChannel inputChannel = 
spy(createRemoteInputChannel(inputGate, connClient, backoff));
--- End diff --

why did you have to explicitly create `PartitionRequestClient` and 
`backoff`?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145697848
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
+
+   // Enqueue the input channel
+   handler.notifyCreditAvailable(inputChannel);
+
+   channel.runPendingTasks();
+
+   // Read the enqueued msg
+   Object msg1 = channel.readOutbound();
+
+   // Should notify credit
+   assertEquals(msg1.getClass(), AddCredit.class);
+   }
+
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
+* message is not sent actually after this input channel is released.
+*/
+   @Test
+   public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

ditto


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145697819
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -208,6 +211,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel channel = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
--- End diff --

Could you use `RemoteInputChannel` instead of mocking it? You could add 
test method `createTestRemoteInputChannel()` and it seems like you would only 
need to mock `SingleInputGate`


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144244569
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 ---
@@ -378,32 +387,47 @@ public void testReleaseExclusiveBuffers() throws 
Exception {
 
/**
 * Tests {@link BufferPool#requestBuffer()}, verifying the remote input 
channel tries to request
-* floating buffers once receiving the producer's backlog.
+* floating buffers once receiving the producer's backlog, and then 
notifies credit available after
+* receiving floating buffers.
 */
@Test
public void testRequestFloatingBuffersOnBuffer() throws Exception {
+   // Config
+   final Tuple2 backoff = new Tuple2<>(0, 0);
+
// Setup
final BufferPool bufferPool = mock(BufferPool.class);

when(bufferPool.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
 
+   final PartitionRequestClient connClient = 
mock(PartitionRequestClient.class);
--- End diff --

ditto?


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-19 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r145697953
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

was this addressed?


---


  1   2   >