[jira] [Commented] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16488085#comment-16488085
 ] 

Till Rohrmann commented on FLINK-9421:
--

Sorry [~yanghua], I actually have a fix ready but did not see that you assigned 
the issue to yourself. If it's ok, then I would take this issue.

> RunningJobsRegistry entries are not cleaned up after job termination
> 
>
> Key: FLINK-9421
> URL: https://issues.apache.org/jira/browse/FLINK-9421
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>
> Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
> after the job has finished. The consequence is that a ZNode with the JobID 
> and a state num per job remains in ZooKeeper.
> We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9422) Dedicated operator for UNION on streaming tables with time attributes

2018-05-23 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487968#comment-16487968
 ] 

Rong Rong commented on FLINK-9422:
--

+1 on "can apply windowed aggregates on the result". This would make things 
much easier without the unnecessary "aggregation"

> Dedicated operator for UNION on streaming tables with time attributes
> -
>
> Key: FLINK-9422
> URL: https://issues.apache.org/jira/browse/FLINK-9422
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
>
> We can implement a dedicated operator for a {{UNION}} operator on tables with 
> time attributes. Currently, {{UNION}} is translated into a {{UNION ALL}} and 
> a subsequent {{GROUP BY}} on all attributes without aggregation functions. 
> The state of the grouping operator is only clean up using state retention 
> timers. 
> The dedicated operator would leverage the monotonicity property of the time 
> attribute and watermarks to automatically clean up its state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5964: [FLINK-8655] [Cassandra Connector] add keyspace in cassan...

2018-05-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5964
  
That the example doesn't work isn't related to this PR so I'll merge it as 
is.
But I'll take a look what the problem is, and either fix it or open a JIRA.


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486808#comment-16486808
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5964
  
That the example doesn't work isn't related to this PR so I'll merge it as 
is.
But I'll take a look what the problem is, and either fix it or open a JIRA.


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9383) Extend DistributedCache E2E test to cover directories

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486838#comment-16486838
 ] 

ASF GitHub Bot commented on FLINK-9383:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6023#discussion_r190144815
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -93,6 +93,14 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n==\n"
+printf "Running Distributed cache end-to-end test\n"
+printf 
"==\n"
+
$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh
--- End diff --

Should update this to use the new `run_test` utility we have.


> Extend DistributedCache E2E test to cover directories
> -
>
> Key: FLINK-9383
> URL: https://issues.apache.org/jira/browse/FLINK-9383
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Tests
>Affects Versions: 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6023: [FLINK-9383][runtime] Test directories in Distribu...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6023#discussion_r190144815
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -93,6 +93,14 @@ if [ $EXIT_CODE == 0 ]; then
 EXIT_CODE=$?
 fi
 
+if [ $EXIT_CODE == 0 ]; then
+printf 
"\n==\n"
+printf "Running Distributed cache end-to-end test\n"
+printf 
"==\n"
+
$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh
--- End diff --

Should update this to use the new `run_test` utility we have.


---


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-05-23 Thread fmthoma
Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6021
  
@tzulitai I believe the right location is `docs/dev/connectors/kinesis.md`? 
I'll add some docs there.


---


[GitHub] flink issue #5650: [FLINK-8845][state] Introduce RocksDBWriteBatchWrapper to...

2018-05-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5650
  
@sihuazhou thanks for this nice contribution. LGTM 👍 Will merge.


---


[GitHub] flink issue #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT BETWEEN...

2018-05-23 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6027
  
Thanks for the update @Xpray. I will merge this...


---


[jira] [Commented] (FLINK-7814) Add BETWEEN and NOT BETWEEN expression to Table API

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486902#comment-16486902
 ] 

ASF GitHub Bot commented on FLINK-7814:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6027


> Add BETWEEN and NOT BETWEEN expression to Table API
> ---
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
> Fix For: 1.6.0
>
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...

2018-05-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r190168697
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

EDIT removed my previous comment about double-checked locking because I saw 
that the variable is `volatile`


---


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486917#comment-16486917
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r190168697
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

EDIT removed my previous comment about double-checked locking because I saw 
that the variable is `volatile`


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> 

[jira] [Commented] (FLINK-9344) Support INTERSECT and INTERSECT ALL for streaming

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486934#comment-16486934
 ] 

ASF GitHub Bot commented on FLINK-9344:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190171657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
--- End diff --

I think it makes sense to have two implementations of this operator.
1. For tables with a time attribute. This implementation works without 
retraction and can automatically cleanup the state. 
2. For tables without time attributes. This implementation needs to cleanup 
state based on retention time and produces retractions.

This PR seems to address both cases, which is fine for now. We can improve 
for 1. later on. Both cases should be implemented as `CoProcessFunction`. We 
should try to be independent of the DataStream window operators, IMO.


> Support INTERSECT and INTERSECT ALL for streaming
> -
>
> Key: FLINK-9344
> URL: https://issues.apache.org/jira/browse/FLINK-9344
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> support intersect and intersect all for both SQL and TableAPI



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-05-23 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6059

[Flink-9418] Migrate SharedBuffer to use MapState

## What is the purpose of the change

Migrate `SharedBuffer` to `MapState` so to improve memory managment and 
decrease the amount of data that is deserialized.

It is based on #5960. Only the last two commits apply to this change.

## Verifying this change

This change is already covered by existing tests. Added a test that checks 
number of state accesses to keep track if we do not degrade performance in the 
future.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-sharedbuffer-reworked

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6059.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6059


commit 849692eec7576264292a8feaf4f127ad6c47601d
Author: Aljoscha Krettek 
Date:   2018-02-07T12:55:11Z

[FLINK-8725] Separate state from NFA in CEP library

This also changes the serialization of state to not include the static
NFA parts and to also not include any user code.

commit 07f588678cf08422eb12d2ab415d9becc9273144
Author: Dawid Wysakowicz 
Date:   2018-05-04T13:41:27Z

Reverted backward compatibility with <=1.5

commit d3d11d9d4c474c2ff84183d8db8c77045a7771d9
Author: Dawid Wysakowicz 
Date:   2018-05-16T12:08:13Z

[FLINK-9418] Added SharedBuffer v2

commit 515ebba0d90722940e95542a5e9bab8cd9f22939
Author: Dawid Wysakowicz 
Date:   2018-05-23T08:04:37Z

[FLINK-9418] Switched to SharedBuffer v2




---


[GitHub] flink issue #6059: [Flink-9418] Migrate SharedBuffer to use MapState

2018-05-23 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6059
  
R: @kl0u 


---


[GitHub] flink pull request #6041: [FLINK-9326] TaskManagerOptions.NUM_TASK_SLOTS doe...

2018-05-23 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6041#discussion_r190187802
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 ---
@@ -103,9 +103,16 @@ public JobExecutionResult execute(String jobName) 
throws Exception {
configuration.setInteger(RestOptions.PORT, 0);
}
 
+   int numSlotsPerTaskManager;
+   if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
--- End diff --

cc @zentol


---


[jira] [Created] (FLINK-9422) Dedicated operator for UNION on streaming tables with time attributes

2018-05-23 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-9422:


 Summary: Dedicated operator for UNION on streaming tables with 
time attributes
 Key: FLINK-9422
 URL: https://issues.apache.org/jira/browse/FLINK-9422
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Fabian Hueske


We can implement a dedicated operator for a {{UNION}} operator on tables with 
time attributes. Currently, {{UNION}} is translated into a {{UNION ALL}} and a 
subsequent {{GROUP BY}} on all attributes without aggregation functions. The 
state of the grouping operator is only clean up using state retention timers. 

The dedicated operator would leverage the monotonicity property of the time 
attribute and watermarks to automatically clean up its state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140724
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
+
+   protected static HashMap
+   createInitialSubscribedStreamsToLastDiscoveredShardsState(List 
streams) {
+   HashMap initial = new HashMap<>();
+   for (String stream : streams) {
+   initial.put(stream, null);
+   }
+   return initial;
+   }
+
+   private static ListShardsRequestMatcher 
initialListShardsRequestMatcher() {
+   return new ListShardsRequestMatcher(null, null);
+   }
+
+   private static ListShardsRequestMatcher listShardsNextToken(final 
String nextToken) {
--- End diff --

nit: IMO, it would help with readability if we move these private utility 
methods after the main test ones.


---


[GitHub] flink pull request #5992: [FLINK-8944] [Kinesis Connector] Use listShards in...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140597
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
--- End diff --

Should we move these to be scoped only to the `testGetShardList ` test 
method?


---


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486812#comment-16486812
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140724
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
+
+   protected static HashMap
+   createInitialSubscribedStreamsToLastDiscoveredShardsState(List 
streams) {
+   HashMap initial = new HashMap<>();
+   for (String stream : streams) {
+   initial.put(stream, null);
+   }
+   return initial;
+   }
+
+   private static ListShardsRequestMatcher 
initialListShardsRequestMatcher() {
+   return new ListShardsRequestMatcher(null, null);
+   }
+
+   private static ListShardsRequestMatcher listShardsNextToken(final 
String nextToken) {
--- End diff --

nit: IMO, it would help with readability if we move these private utility 
methods after the main test ones.


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8944) Use ListShards for shard discovery in the flink kinesis connector

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486811#comment-16486811
 ] 

ASF GitHub Bot commented on FLINK-8944:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5992#discussion_r190140597
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxyTest.java
 ---
@@ -26,20 +29,60 @@
 import com.amazonaws.ClientConfigurationFactory;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
+import com.amazonaws.services.kinesis.model.ListShardsRequest;
+import com.amazonaws.services.kinesis.model.ListShardsResult;
 import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
+import com.amazonaws.services.kinesis.model.Shard;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsCollectionWithSize.hasSize;
+import static 
org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 
 /**
  * Test for methods in the {@link KinesisProxy} class.
  */
 public class KinesisProxyTest {
+   private static final String NEXT_TOKEN = "NextToken";
+   private static final String fakeStreamName = "fake-stream";
+   private Set shardIdSet;
+   private List shards;
--- End diff --

Should we move these to be scoped only to the `testGetShardList ` test 
method?


> Use ListShards for shard discovery in the flink kinesis connector
> -
>
> Key: FLINK-8944
> URL: https://issues.apache.org/jira/browse/FLINK-8944
> Project: Flink
>  Issue Type: Improvement
>Reporter: Kailash Hassan Dayanand
>Priority: Minor
>
> Currently the DescribeStream AWS API used to get list of shards is has a 
> restricted rate limits on AWS. (5 requests per sec per account). This is 
> problematic when running multiple flink jobs all on same account since each 
> subtasks calls the Describe Stream. Changing this to ListShards will provide 
> more flexibility on rate limits as ListShards has a 100 requests per second 
> per data stream limits.
> More details on the mailing list. https://goo.gl/mRXjKh



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-23 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-9349.

   Resolution: Fixed
Fix Version/s: 1.5.1
   1.4.3
   1.6.0

Merged. Fixed via,

1.6.0 - 049994274c9d4fc07925a7639e4044506b090d10
1.5.1 - bc4a402d09304a21c82299e368442c8a6e4ae427
1.4.3 - 61c44d902d081ad5bf0e1654f62f70567d25fde8

> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9416) Make job submission retriable operation in case of a ongoing leader election

2018-05-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9416?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9416:
-
Fix Version/s: 1.5.1
   1.6.0

> Make job submission retriable operation in case of a ongoing leader election
> 
>
> Key: FLINK-9416
> URL: https://issues.apache.org/jira/browse/FLINK-9416
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> When starting a session cluster, it can happen that the job submission fails 
> if the REST server endpoint has already gained leadership but if the 
> leadership election for the {{Dispatcher}} is still ongoing. In such a case, 
> we receive a error response saying that the leader election is still ongoing 
> and fail the job submission. I think it would be nicer to also make the 
> submission step a retriable operation in order to avoid this race condition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9416) Make job submission retriable operation in case of a ongoing leader election

2018-05-23 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9416:


 Summary: Make job submission retriable operation in case of a 
ongoing leader election
 Key: FLINK-9416
 URL: https://issues.apache.org/jira/browse/FLINK-9416
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann


When starting a session cluster, it can happen that the job submission fails if 
the REST server endpoint has already gained leadership but if the leadership 
election for the {{Dispatcher}} is still ongoing. In such a case, we receive a 
error response saying that the leader election is still ongoing and fail the 
job submission. I think it would be nicer to also make the submission step a 
retriable operation in order to avoid this race condition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486873#comment-16486873
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190152570
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I have the feeling that this method is way too complex now, to a point that 
it might make more sense to break this up into 2 different methods - 
`addPartitionsToAssignment` and `removePartitionsFromAssignment`.


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486874#comment-16486874
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190153528
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

Would it actually make more sense that we have a queue for this? Like how 
we are handling unassigned new partitions via the `unassignedPartitionsQueue`. 
The fact that this is a set means that we will need to eventually remove 
entries from it anyways.


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486872#comment-16486872
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190150419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

Why do we want this to be configurable? Is there any case that we would 
prefer to leave them untouched?


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190153528
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -80,6 +83,9 @@
/** The queue of unassigned partitions that we need to assign to the 
Kafka consumer. */
private final 
ClosableBlockingQueue 
unassignedPartitionsQueue;
 
+   /** The list of partitions to be removed from kafka consumer. */
+   private final Set partitionsToBeRemoved;
--- End diff --

Would it actually make more sense that we have a queue for this? Like how 
we are handling unassigned new partitions via the `unassignedPartitionsQueue`. 
The fact that this is a set means that we will need to eventually remove 
entries from it anyways.


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190150419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -235,7 +243,8 @@ public FlinkKafkaConsumerBase(
Pattern topicPattern,
KeyedDeserializationSchema deserializer,
long discoveryIntervalMillis,
-   boolean useMetrics) {
+   boolean useMetrics,
+   boolean checkUnavailablePartitions) {
--- End diff --

Why do we want this to be configurable? Is there any case that we would 
prefer to leave them untouched?


---


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-23 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r190152570
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ---
@@ -374,8 +385,8 @@ void setOffsetsToCommit(
 * This method is exposed for testing purposes.
 */
@VisibleForTesting
-   void reassignPartitions(List 
newPartitions) throws Exception {
-   if (newPartitions.size() == 0) {
+   void reassignPartitions(List 
newPartitions, Set partitionsToBeRemoved) throws Exception {
--- End diff --

I have the feeling that this method is way too complex now, to a point that 
it might make more sense to break this up into 2 different methods - 
`addPartitionsToAssignment` and `removePartitionsFromAssignment`.


---


[jira] [Created] (FLINK-9418) Migrate SharedBuffer to use MapState

2018-05-23 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9418:
---

 Summary: Migrate SharedBuffer to use MapState
 Key: FLINK-9418
 URL: https://issues.apache.org/jira/browse/FLINK-9418
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz
 Fix For: 1.6.0


Right now {{SharedBuffer}} is implemented with java Collections and the whole 
buffer is deserialized on each access. We should migrate it to MapState, so 
that only the necessary parts (e.g. tail entries) are deserialized.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6053: [FLINK-9257][E2E Tests] Fix wrong "All tests pass"...

2018-05-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190156721
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -30,168 +30,50 @@ if [ -z "$FLINK_DIR" ] ; then
 exit 1
 fi
 
-source "$(dirname "$0")"/test-scripts/common.sh
+source "$(dirname "$0")"/test-scripts/test-runner-common.sh
 
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
 echo "Flink distribution directory: $FLINK_DIR"
 
-EXIT_CODE=0
-
 # Template for adding a test:
 
-# if [ $EXIT_CODE == 0 ]; then
-#run_test "" "$END_TO_END_DIR/test-scripts/"
-#EXIT_CODE=$?
-# fi
-
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 
file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file 
false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks 
false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, 
async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true 
true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, 
sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file 
false true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure 
(rocks) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks 
true"
-  EXIT_CODE=$?

[jira] [Commented] (FLINK-9257) End-to-end tests prints "All tests PASS" even if individual test-script returns non-zero exit code

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486885#comment-16486885
 ] 

ASF GitHub Bot commented on FLINK-9257:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6053#discussion_r190156721
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -30,168 +30,50 @@ if [ -z "$FLINK_DIR" ] ; then
 exit 1
 fi
 
-source "$(dirname "$0")"/test-scripts/common.sh
+source "$(dirname "$0")"/test-scripts/test-runner-common.sh
 
 FLINK_DIR="`( cd \"$FLINK_DIR\" && pwd )`" # absolutized and normalized
 
 echo "flink-end-to-end-test directory: $END_TO_END_DIR"
 echo "Flink distribution directory: $FLINK_DIR"
 
-EXIT_CODE=0
-
 # Template for adding a test:
 
-# if [ $EXIT_CODE == 0 ]; then
-#run_test "" "$END_TO_END_DIR/test-scripts/"
-#EXIT_CODE=$?
-# fi
-
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh file false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, non-incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Running HA (rocks, incremental) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_ha.sh rocks true true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 
file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, async, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (file, sync, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 file false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, no parallelism change) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale up) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Savepoint (rocks, scale down) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2 rocks"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, async) end-to-end 
test" "$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh 
file true false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (file, sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file 
false false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint (rocks) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh rocks 
false"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, 
async) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file true 
true"
-  EXIT_CODE=$?
-fi
-
-if [ $EXIT_CODE == 0 ]; then
-  run_test "Resuming Externalized Checkpoint after terminal failure (file, 
sync) end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_resume_externalized_checkpoints.sh file 
false true"
-  EXIT_CODE=$?
-fi
-

[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5869
  
that shouldn't be possible, as all tasks are removed when a TM 
disassociates from the JM which also implies removing all metrics related to a 
specific task.


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486912#comment-16486912
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
so I just remove the close method from `disassociateFromJobManager `?


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486911#comment-16486911
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5959
  
I have two quick questions:
- Why does `ComponentMetricGroup` even override the method 
`getAllVariables` from `AbstractMetricGroup` with essentially the exact same 
code?
-Why is it only fixed in `ComponentMetricGroup`? Could it make sense to fix 
it in `AbstractMetricGroup` and remove the overriding method in the subclass?


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
so I just remove the close method from `disassociateFromJobManager `?


---


[GitHub] flink issue #5959: [FLINK-9258][metrics] Thread-safe initialization of varia...

2018-05-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5959
  
I have two quick questions:
- Why does `ComponentMetricGroup` even override the method 
`getAllVariables` from `AbstractMetricGroup` with essentially the exact same 
code?
-Why is it only fixed in `ComponentMetricGroup`? Could it make sense to fix 
it in `AbstractMetricGroup` and remove the overriding method in the subclass?


---


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol normally, it seems you are right. But I don't know if there is any 
exception could cause any remained information, at the jm switching moment.


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486910#comment-16486910
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol normally, it seems you are right. But I don't know if there is any 
exception could cause any remained information, at the jm switching moment.


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486925#comment-16486925
 ] 

ASF GitHub Bot commented on FLINK-9349:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
Using `CheckedThread` is more preferable, as it simplifies some of the test 
code.
But yes, the utility was introduced at a later point in time in Flink, so 
some parts of the test code might still be using `Thread`s and 
`AtomicReference`s.


> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486930#comment-16486930
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190169591
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
--- End diff --

Put the text one line down.


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5869
  
that should do the trick imo.


---


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486950#comment-16486950
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5959
  
Sounds good. After the mentioned changes this looks ready to merge for me  


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5959: [FLINK-9258][metrics] Thread-safe initialization of varia...

2018-05-23 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5959
  
Sounds good. After the mentioned changes this looks ready to merge for me 
👍 


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190176842
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

We implemented state cleanup as processing time because it is easier to 
reason about for users and doesn't interfere that much with event-time 
processing (it is not possible to distinguish timers yet). However, it also has 
a few short comings such as cleared state when recovering a query from a 
savepoint (which we don't really encourage at the moment). 

Anyway, introducing event-time state cleanup should definitely go into a 
separate issue and PR.


---


[jira] [Commented] (FLINK-6016) Newlines should be valid in quoted strings in CSV

2018-05-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486989#comment-16486989
 ] 

Fabian Hueske commented on FLINK-6016:
--

This problem cannot be solve with the current implementation of the 
{{CsvInputFormat}} which is based on {{DelimitedInputFormat}}.
In the current implementation, the file is first split into rows (without 
looking at quote characters) and then each row is parsed. This behavior is 
pretty much baked in and cannot be easily changed.

There is a [PR that uses a CSV parsing 
library|https://github.com/apache/flink/pull/4660] to scan CSV files and 
handles this case better.
However, in general row delimiters in quoted strings can only be properly 
processed, if we read CSV files as a whole, i.e., without splitting them into 
smaller chunks which are read in parallel by different tasks. 

> Newlines should be valid in quoted strings in CSV
> -
>
> Key: FLINK-6016
> URL: https://issues.apache.org/jira/browse/FLINK-6016
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Major
>
> The RFC for the CSV format specifies that newlines are valid in quoted 
> strings in CSV:
> https://tools.ietf.org/html/rfc4180
> However, when parsing a CSV file with Flink containing a newline, such as:
> {noformat}
> "3
> 4",5
> {noformat}
> you get this exception:
> {noformat}
> Line could not be parsed: '"3'
> ParserError UNTERMINATED_QUOTED_STRING 
> Expect field types: class java.lang.String, class java.lang.String 
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486843#comment-16486843
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190145819
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,155 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+$FLINK_DIR/bin/taskmanager.sh start
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  cleanup
+}
+
+printf 
"\n==\n"
+printf "Test default job launch with non-detach mode\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run $FLINK_DIR/examples/batch/WordCount.jar"
+EXIT_CODE=$?
+fi
+
+printf 
"\n==\n"
+printf "Test run with complex parameter set\n"
+printf 
"==\n"
+if [ $EXIT_CODE == 0 ]; then
+eval "$FLINK_DIR/bin/flink run -m localhost:8081 -p 4 -q -d \
--- End diff --

Well, we can have a completely normal exit code from the `run` execution, 
but the `-p` option completely ignored if we change the CLI to simply not 
recognize the option.

This is an extreme case, though.


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6027: [FLINK-7814][TableAPI && SQL] Add BETWEEN and NOT ...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6027


---


[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6040
  
Using `CheckedThread` is more preferable, as it simplifies some of the test 
code.
But yes, the utility was introduced at a later point in time in Flink, so 
some parts of the test code might still be using `Thread`s and 
`AtomicReference`s.


---


[GitHub] flink issue #6007: [FLINK-8518][Table API & SQL] Support DOW for EXTRACT

2018-05-23 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6007
  
Thanks for the PR @snuyanzin. I created the issue because it seems that 
this is more than a one line change. I tested your queries in Postgres SQL and 
it returns 0 instead of 1. We should have the same semantics as popular 
database vendors. How is Oracle defining this feature?


---


[jira] [Created] (FLINK-9419) UNION should not be treated as retraction producing operator

2018-05-23 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-9419:


 Summary: UNION should not be treated as retraction producing 
operator
 Key: FLINK-9419
 URL: https://issues.apache.org/jira/browse/FLINK-9419
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Fabian Hueske


The following query fails 

{code}
SELECT
user_id,
count(msg),
HOP_END(rowtime, INTERVAL '1' second, INTERVAL '1' minute)
FROM (SELECT rowtime, user_id, action_name AS msg FROM
  event_client_action
WHERE /* various clauses */
UNION SELECT rowtime, user_id, action_type AS msg FROM
   event_server_action
   WHERE /* various clauses */
  )
GROUP BY
HOP(rowtime, INTERVAL '1' second, INTERVAL '1' minute), user_id
{code}

with 

{quote}Retraction on windowed GroupBy aggregation is not supported yet. Note: 
Windowed GroupBy aggregation should not follow a non-windowed GroupBy 
aggregation.{quote}

The problem is that the {{UNION}} operator is translated into a {{UNION ALL}} 
and a subsequent {{GROUP BY}} on all attributes without an aggregation 
function. Currently, all {{GROUP BY}} operators are treated as 
retraction-producing operators. However, this is only true for grouping 
operators with aggregation functions. If the operator groups on all attributes 
and has no aggregation functions, it does not produce retractions but only 
forwards them (similar to a filter operator).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-05-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9058:
-
Affects Version/s: 1.6.0
   1.5.0

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9058) Relax ListState.addAll() and ListState.update() to take Iterable

2018-05-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9058?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-9058:
-
Fix Version/s: (was: 1.5.0)

> Relax ListState.addAll() and ListState.update() to take Iterable
> 
>
> Key: FLINK-9058
> URL: https://issues.apache.org/jira/browse/FLINK-9058
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
>
> [~srichter] What do you think about this. None of the implementations require 
> the parameter to actually be a list and allowing an {{Iterable}} there allows 
> calling it in situations where all you have is an {{Iterable}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140777
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes,
+  

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486823#comment-16486823
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190136859
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486821#comment-16486821
 ] 

ASF GitHub Bot commented on FLINK-9349:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6040


> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190136859
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes,
+  

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486829#comment-16486829
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190141550
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 ---
@@ -78,6 +82,20 @@ public int numKeyedStateEntries() {
}
}
 
+   public  S getState(K key, StateDescriptor 
stateDesc) throws Exception {
--- End diff --

Is this change necessary? We should only modify code outside of 
`flink-table` if it is urgently needed.


> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6040: [FLINK-9349][Kafka Connector] KafkaConnector Excep...

2018-05-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6040


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes,
+  

[jira] [Assigned] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread

2018-05-23 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reassigned FLINK-9417:
-

Assignee: Sihua Zhou

> Send heartbeat requests from RPC endpoint's main thread
> ---
>
> Key: FLINK-9417
> URL: https://issues.apache.org/jira/browse/FLINK-9417
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>
> Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat 
> requests to remote targets. This has the problem that we still see heartbeats 
> from this endpoint also if its main thread is currently blocked. Due to this, 
> the heartbeat response cannot be processed and the remote target times out. 
> On the remote side, this won't be noticed because it still receives the 
> heartbeat requests.
> A solution to this problem would be to send the heartbeat requests to the 
> remote thread through the RPC endpoint's main thread. That way, also the 
> heartbeats would be blocked if the main thread is blocked/busy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486907#comment-16486907
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r190165634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

To be clear, this is not exactly about the fixed lines, but the line that 
is commented with "optimization for common case" is the remaining problem.


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> 

[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...

2018-05-23 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r190165634
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

To be clear, this is not exactly about the fixed lines, but the line that 
is commented with "optimization for common case" is the remaining problem.


---


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486929#comment-16486929
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190170050
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
+*
+*/
+   private final TypeInformation typeInfo;
+
/**
 * Creates a Avro deserialization schema for the given record.
 *
 * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
 */
-   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   public AvroRowDeserializationSchema(Class 
recordClazz){
+   this(recordClazz, null);
+   }
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+* @param typeInfo Type information describing the result type.
+*/
+   public AvroRowDeserializationSchema(Class 
recordClazz, TypeInformation typeInfo) {
--- End diff --

This constructor is not required. We should always use the result of the 
`AvroRecordClassConverter`.


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6026: [FLINK-9384]KafkaAvroTableSource failed to work du...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190169591
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
--- End diff --

Put the text one line down.


---


[GitHub] flink pull request #6026: [FLINK-9384]KafkaAvroTableSource failed to work du...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190170050
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
+*
+*/
+   private final TypeInformation typeInfo;
+
/**
 * Creates a Avro deserialization schema for the given record.
 *
 * @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
 */
-   public AvroRowDeserializationSchema(Class 
recordClazz) {
+   public AvroRowDeserializationSchema(Class 
recordClazz){
+   this(recordClazz, null);
+   }
+
+   /**
+* Creates a Avro deserialization schema for the given record.
+*
+* @param recordClazz Avro record class used to deserialize Avro's 
record to Flink's row
+* @param typeInfo Type information describing the result type.
+*/
+   public AvroRowDeserializationSchema(Class 
recordClazz, TypeInformation typeInfo) {
--- End diff --

This constructor is not required. We should always use the result of the 
`AvroRecordClassConverter`.


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190171657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
--- End diff --

I think it makes sense to have two implementations of this operator.
1. For tables with a time attribute. This implementation works without 
retraction and can automatically cleanup the state. 
2. For tables without time attributes. This implementation needs to cleanup 
state based on retention time and produces retractions.

This PR seems to address both cases, which is fine for now. We can improve 
for 1. later on. Both cases should be implemented as `CoProcessFunction`. We 
should try to be independent of the DataStream window operators, IMO.


---


[GitHub] flink pull request #6026: [FLINK-9384]KafkaAvroTableSource failed to work du...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190169511
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
+*
+*/
+   private final TypeInformation typeInfo;
--- End diff --

Declare this transient?


---


[jira] [Commented] (FLINK-9384) KafkaAvroTableSource failed to work due to type mismatch

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486928#comment-16486928
 ] 

ASF GitHub Bot commented on FLINK-9384:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6026#discussion_r190169511
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
 ---
@@ -76,19 +77,35 @@
 */
private SpecificRecord record;
 
+   /** Type information describing the result type.
+*
+*/
+   private final TypeInformation typeInfo;
--- End diff --

Declare this transient?


> KafkaAvroTableSource failed to work due to type mismatch
> 
>
> Key: FLINK-9384
> URL: https://issues.apache.org/jira/browse/FLINK-9384
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: easyfix, patch
> Fix For: 1.6.0
>
> Attachments: flink-9384.patch
>
>
> An exception was thrown when using KafkaAvroTableSource as follows:
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> TableSource of type 
> org.apache.flink.streaming.connectors.kafka.Kafka011AvroTableSource returned 
> a DataStream of type GenericType that does not 
> match with the type Row(id: Integer, name: String, age: Integer, event: 
> GenericType) declared by the TableSource.getReturnType() 
> method. Please validate the implementation of the TableSource.
>  at 
> org.apache.flink.table.plan.nodes.datastream.StreamTableSourceScan.translateToPlan(StreamTableSourceScan.scala:100)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translateToCRow(StreamTableEnvironment.scala:885)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:812)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:279)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>  at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>  at 
> org.apache.flink.quickstart.StreamingJobAvro.main(StreamingJobAvro.java:85)
>  
> It is caused by a discrepancy between the type returned by the TableSource 
> and the type returned by the DataStream. I've already fixed it, would someone 
> please review the patch and see if it could be merged.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486947#comment-16486947
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5959
  
The methods have been identical since FLINK-7692, but I didn't catch it in 
the review. Thus, `ComponentMetricGroup#getAllVariables()` should be removed, 
along with `ComponentMetricGroup#putVariables()`, and the fix applied to 
`AbstractMetricGroup#getAllVariables()`.

We try to construct as many things as possible lazily, to reduce the 
resource impact in case the metric system isn't used.


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9421) RunningJobsRegistry entries are not cleaned up after job termination

2018-05-23 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9421:


 Summary: RunningJobsRegistry entries are not cleaned up after job 
termination
 Key: FLINK-9421
 URL: https://issues.apache.org/jira/browse/FLINK-9421
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination
Affects Versions: 1.5.0, 1.6.0
Reporter: Till Rohrmann


Currently, the {{Dispatcher}} does not clean up the {{RunningJobsRegistry}} 
after the job has finished. The consequence is that a ZNode with the JobID and 
a state num per job remains in ZooKeeper.

We should clean up these ZNodes to avoid a resource leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9070) Improve performance of RocksDBMapState.clear()

2018-05-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9070.
--
Resolution: Fixed

Fixed via
master: 87e54eb3bc
1.5.0: 5df2bc5c9f

> Improve performance of RocksDBMapState.clear()
> --
>
> Key: FLINK-9070
> URL: https://issues.apache.org/jira/browse/FLINK-9070
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Truong Duc Kien
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, RocksDBMapState.clear() is implemented by iterating over all the 
> keys and drop them one by one. This iteration can be quite slow with: 
>  * Large maps
>  * High-churn maps with a lot of tombstones
> There are a few methods to speed-up deletion for a range of keys, each with 
> their own caveats:
>  * DeleteRange: still experimental, likely buggy
>  * DeleteFilesInRange + CompactRange: only good for large ranges
>  
> Flink can also keep a list of inserted keys in-memory, then directly delete 
> them without having to iterate over the Rocksdb database again. 
>  
> Reference:
>  * [RocksDB article about range 
> deletion|https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys]
>  * [Bug in DeleteRange|https://pingcap.com/blog/2017-09-08-rocksdbbug]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486848#comment-16486848
 ] 

ASF GitHub Bot commented on FLINK-8985:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5863#discussion_r190146869
  
--- Diff: flink-end-to-end-tests/test-scripts/test_cli_api.sh ---
@@ -0,0 +1,196 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+start_cluster
+
+# Test for CLI commands.
+# verify only the return code the content correctness of the API results.

+PERIODIC_JOB_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-api-test/target/PeriodicStreamingJob.jar
+JOB_ID_REGEX_EXTRACTOR=".*JobID ([0-9,a-f]*)"
+SAVE_POINT_REGEX_EXTRACTOR=".*Savepoint stored in (.*)\\."
+JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR="\"pact\": \"(Data Source)\""
+JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR="\"pact\": \"(Data Sink)\""
+JOB_LIST_REGEX_EXTRACTOR_BY_STATUS="([0-9,a-f]*) :"
+
+EXIT_CODE=0
+
+function extract_job_id_from_job_submission_return() {
+if [[ $1 =~ $JOB_ID_REGEX_EXTRACTOR ]];
+then
+JOB_ID="${BASH_REMATCH[1]}";
+else
+JOB_ID=""
+fi
+echo "$JOB_ID"
+}
+
+function extract_savepoint_path_from_savepoint_return() {
+if [[ $1 =~ $SAVE_POINT_REGEX_EXTRACTOR ]];
+then
+SAVEPOINT_PATH="${BASH_REMATCH[1]}";
+else
+SAVEPOINT_PATH=""
+fi
+echo "$SAVEPOINT_PATH"
+}
+
+function extract_valid_pact_from_job_info_return() {
+PACT_MATCH=0
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SOURCE_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+if [[ $1 =~ $JOB_INFO_PACT_DATA_SINK_REGEX_EXTRACTOR ]];
+then
+PACT_MATCH=$PACT_MATCH
+else
+PACT_MATCH=-1
+fi
+echo ${PACT_MATCH}
+}
+
+function extract_valid_job_list_by_type_from_job_list_return() {
+JOB_LIST_MATCH=0
+JOB_LIST_REGEX_EXTRACTOR="$JOB_LIST_REGEX_EXTRACTOR_BY_STATUS $2 $3"
+if [[ $1 =~ $JOB_LIST_REGEX_EXTRACTOR ]];
+then
+JOB_LIST_MATCH=$JOB_LIST_MATCH
+else
+JOB_LIST_MATCH=-1
+fi
+echo ${JOB_LIST_MATCH}
+}
+
+function cleanup_cli_test() {
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
--- End diff --

I don't think we need to explicitly shutdown the cluster and TMs here; that 
is already part of the `cleanup` call


> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5840: [FLINK-7850] [build system] Give each maven profil...

2018-05-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5840#discussion_r190155490
  
--- Diff: flink-libraries/flink-ml/pom.xml ---
@@ -116,6 +119,9 @@
default

true
+   
+   default
--- End diff --

This profile is never activated from the command-line and doesn't require 
the property.


---


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486879#comment-16486879
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
@fmthoma yes, that would be great.


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7850) Given each maven profile an activation property

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486880#comment-16486880
 ] 

ASF GitHub Bot commented on FLINK-7850:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5840#discussion_r190155472
  
--- Diff: flink-libraries/flink-ml/pom.xml ---
@@ -103,6 +103,9 @@

windows

+   
+   windows
--- End diff --

This profile is never activated from the command-line and doesn't require 
the property.


> Given each maven profile an activation property
> ---
>
> Key: FLINK-7850
> URL: https://issues.apache.org/jira/browse/FLINK-7850
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: starter
>
> We should give every maven profile an activation property so that they can be 
> activated with {{-Dabcde}}. This makes them a lot easier to work with in 
> scripts that want to control profile activation, since you can just append 
> {{-D}} switches. This doesn't work with the {{-P}} switch as it can only be 
> specified once.
> {code}
> 
>   
>   profile_name_or_something
>   
> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486876#comment-16486876
 ] 

ASF GitHub Bot commented on FLINK-9374:
---

Github user fmthoma commented on the issue:

https://github.com/apache/flink/pull/6021
  
@tzulitai I believe the right location is `docs/dev/connectors/kinesis.md`? 
I'll add some docs there.


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Franz Thoma
>Priority: Critical
> Attachments: after.png, before.png
>
>
> The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
> {{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
> internally holds an unbounded queue of records that have not yet been sent.
> Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
> grow indefinitely if Flink sends records faster than the KPL can forward them 
> to Kinesis.
> One way to circumvent this problem is to set a record TTL, so that queued 
> records are dropped after a certain amount of time, but this will lead to 
> data loss under high loads.
> Currently the only time the queue is flushed is during checkpointing: 
> {{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
> checkpoint is reached (and will wait until the queue is flushed), or until 
> out-of-memory, whichever is reached first. (This gets worse due to the fact 
> that the Java KPL is only a thin wrapper around a C++ process, so it is not 
> even the Java process that runs out of memory, but the C++ process.) The 
> implicit rate-limit due to checkpointing leads to a ragged throughput graph 
> like this (the periods with zero throughput are the wait times before a 
> checkpoint):
> !file:///home/fthoma/projects/flink/before.png!!before.png! Throughput 
> limited by checkpointing only
> My proposed solution is to add a config option {{queueLimit}} to set a 
> maximum number of records that may be waiting in the KPL queue. If this limit 
> is reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and 
> wait (blocking) until the queue length is below the limit again. This 
> automatically leads to backpressuring, since the {{FlinkKinesisProducer}} 
> cannot accept records while waiting. For compatibility, {{queueLimit}} is set 
> to {{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a 
> client explicitly sets the value. Setting a »sane« default value is not 
> possible unfortunately, since sensible values for the limit depend on the 
> record size (the limit should be chosen so that about 10–100MB of records per 
> shard are accumulated before flushing, otherwise the maximum Kinesis 
> throughput may not be reached).
> !after.png! Throughput with a queue limit of 10 records (the spikes are 
> checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8845) Use WriteBatch to improve performance for recovery in RocksDB backend

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486877#comment-16486877
 ] 

ASF GitHub Bot commented on FLINK-8845:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5650
  
@sihuazhou thanks for this nice contribution. LGTM  Will merge.


> Use WriteBatch to improve performance for recovery in RocksDB backend
> -
>
> Key: FLINK-8845
> URL: https://issues.apache.org/jira/browse/FLINK-8845
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0
>
>
> Base on {{WriteBatch}} we could get 30% ~ 50% performance lift when loading 
> data into RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpr...

2018-05-23 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/6021
  
@fmthoma yes, that would be great.


---


[GitHub] flink issue #5869: [FLINK-8946] TaskManager stop sending metrics after JobMa...

2018-05-23 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol if we just remove it from `disassociateFromJobManager ` and do not 
clear and reinitialize it, when a new job manager reconnect to the task manager 
, it would remain some tasks' information remain. Because when call 
`submitTask` method would trigger `taskManagerMetricGroup.addTaskForJob` . 


---


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486890#comment-16486890
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5869
  
@zentol if we just remove it from `disassociateFromJobManager ` and do not 
clear and reinitialize it, when a new job manager reconnect to the task manager 
, it would remain some tasks' information remain. Because when call 
`submitTask` method would trigger `taskManagerMetricGroup.addTaskForJob` . 


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486897#comment-16486897
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5869
  
that shouldn't be possible, as all tasks are removed when a TM 
disassociates from the JM which also implies removing all metrics related to a 
specific task.


> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7814) Add BETWEEN and NOT BETWEEN expression to Table API

2018-05-23 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7814?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7814.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.6.0: 5563681bc1fef4505e62259a05db158b4624e5fa

> Add BETWEEN and NOT BETWEEN expression to Table API
> ---
>
> Key: FLINK-7814
> URL: https://issues.apache.org/jira/browse/FLINK-7814
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Ruidong Li
>Priority: Minor
> Fix For: 1.6.0
>
>
> * The Table API does not have a BETWEEN expression. BETWEEN is quite handy 
> when defining join predicates for window joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9349) KafkaConnector Exception while fetching from multiple kafka topics

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486921#comment-16486921
 ] 

ASF GitHub Bot commented on FLINK-9349:
---

Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6040
  
@tzulitai thank you for your review and comments
based on your comments I have a question. Could you please clarify it?

You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time 
in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I 
used one of them as an example while writing my version of the test). Just to 
be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests 
are more preferable or are there some rules/limitations/whatever?


> KafkaConnector Exception  while fetching from multiple kafka topics
> ---
>
> Key: FLINK-9349
> URL: https://issues.apache.org/jira/browse/FLINK-9349
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Vishal Santoshi
>Assignee: Sergey Nuyanzin
>Priority: Critical
> Fix For: 1.6.0, 1.4.3, 1.5.1
>
> Attachments: Flink9349Test.java
>
>
> ./flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
>  
> It seems the List subscribedPartitionStates was being modified when 
> runFetchLoop iterated the List.
> This can happen if, e.g., FlinkKafkaConsumer runs the following code 
> concurrently:
>                 kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
>  
> {code:java}
>  java.util.ConcurrentModificationException
>   at 
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:966)
>   at java.util.LinkedList$ListItr.next(LinkedList.java:888)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:134)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6040: [FLINK-9349][Kafka Connector] KafkaConnector Exception wh...

2018-05-23 Thread snuyanzin
Github user snuyanzin commented on the issue:

https://github.com/apache/flink/pull/6040
  
@tzulitai thank you for your review and comments
based on your comments I have a question. Could you please clarify it?

You mentioned Flink's `OneShotLatch ` and `CheckedThread ` at the same time 
in some Kafka connector's tests used `AtomicReference`, `Thread` and etc. (I 
used one of them as an example while writing my version of the test). Just to 
be on the sage am I right that `OneShotLatch ` and `CheckedThread ` in tests 
are more preferable or are there some rules/limitations/whatever?


---


[GitHub] flink issue #5959: [FLINK-9258][metrics] Thread-safe initialization of varia...

2018-05-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5959
  
The methods have been identical since FLINK-7692, but I didn't catch it in 
the review. Thus, `ComponentMetricGroup#getAllVariables()` should be removed, 
along with `ComponentMetricGroup#putVariables()`, and the fix applied to 
`AbstractMetricGroup#getAllVariables()`.

We try to construct as many things as possible lazily, to reduce the 
resource impact in case the metric system isn't used.


---


[jira] [Commented] (FLINK-8518) Support DOW, EPOCH, DECADE for EXTRACT

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486966#comment-16486966
 ] 

ASF GitHub Bot commented on FLINK-8518:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6007
  
Thanks for the PR @snuyanzin. I created the issue because it seems that 
this is more than a one line change. I tested your queries in Postgres SQL and 
it returns 0 instead of 1. We should have the same semantics as popular 
database vendors. How is Oracle defining this feature?


> Support DOW, EPOCH, DECADE for EXTRACT
> --
>
> Key: FLINK-8518
> URL: https://issues.apache.org/jira/browse/FLINK-8518
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> We upgraded Calcite to version 1.15 in FLINK-7934. The EXTRACT method 
> supports more conversion targets. The targets DOW, EPOCH, DECADE should be 
> implemented and tested for different datatypes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8946) TaskManager stop sending metrics after JobManager failover

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486990#comment-16486990
 ] 

ASF GitHub Bot commented on FLINK-8946:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6060

[FLINK-8946] TaskManager stop sending metrics after JobManager failover

## What is the purpose of the change

*This pull request improve error message for when queryable state not ready 
/ reachable*


## Brief change log

  - *Improve error message for when queryable state not ready / reachable*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8946-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6060.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6060


commit b75f713f624ee6718f7550a416376ed09af52cc5
Author: yanghua 
Date:   2018-05-23T09:42:04Z

[FLINK-8946] TaskManager stop sending metrics after JobManager failover




> TaskManager stop sending metrics after JobManager failover
> --
>
> Key: FLINK-8946
> URL: https://issues.apache.org/jira/browse/FLINK-8946
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, TaskManager
>Affects Versions: 1.4.2
>Reporter: Truong Duc Kien
>Assignee: vinoyang
>Priority: Critical
> Fix For: 1.5.0
>
>
> Running in Yarn-standalone mode, when the Job Manager performs a failover, 
> all TaskManager that are inherited from the previous JobManager will not send 
> metrics to the new JobManager and other registered metric reporters.
>  
> A cursory glance reveal that these line of code might be the cause
> [https://github.com/apache/flink/blob/a3478fdfa0f792104123fefbd9bdf01f5029de51/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala#L1082-L1086]
> Perhap the TaskManager close its metrics group when disassociating 
> JobManager, but not creating a new one on fail-over association ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6016) Newlines should be valid in quoted strings in CSV

2018-05-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486989#comment-16486989
 ] 

Fabian Hueske edited comment on FLINK-6016 at 5/23/18 9:42 AM:
---

This problem cannot be solved with the current implementation of the 
{{CsvInputFormat}} which is based on {{DelimitedInputFormat}}.
In the current implementation, the file is first split into rows (without 
looking at quote characters) and then each row is parsed. This behavior is 
pretty much baked in and cannot be easily changed.

There is a [PR that uses a CSV parsing 
library|https://github.com/apache/flink/pull/4660] to scan CSV files and 
handles this case better.
However, in general row delimiters in quoted strings can only be properly 
processed, if we read CSV files as a whole, i.e., without splitting them into 
smaller chunks which are read in parallel by different tasks. 


was (Author: fhueske):
This problem cannot be solve with the current implementation of the 
{{CsvInputFormat}} which is based on {{DelimitedInputFormat}}.
In the current implementation, the file is first split into rows (without 
looking at quote characters) and then each row is parsed. This behavior is 
pretty much baked in and cannot be easily changed.

There is a [PR that uses a CSV parsing 
library|https://github.com/apache/flink/pull/4660] to scan CSV files and 
handles this case better.
However, in general row delimiters in quoted strings can only be properly 
processed, if we read CSV files as a whole, i.e., without splitting them into 
smaller chunks which are read in parallel by different tasks. 

> Newlines should be valid in quoted strings in CSV
> -
>
> Key: FLINK-6016
> URL: https://issues.apache.org/jira/browse/FLINK-6016
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Major
>
> The RFC for the CSV format specifies that newlines are valid in quoted 
> strings in CSV:
> https://tools.ietf.org/html/rfc4180
> However, when parsing a CSV file with Flink containing a newline, such as:
> {noformat}
> "3
> 4",5
> {noformat}
> you get this exception:
> {noformat}
> Line could not be parsed: '"3'
> ParserError UNTERMINATED_QUOTED_STRING 
> Expect field types: class java.lang.String, class java.lang.String 
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6060: [FLINK-8946] TaskManager stop sending metrics afte...

2018-05-23 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6060

[FLINK-8946] TaskManager stop sending metrics after JobManager failover

## What is the purpose of the change

*This pull request improve error message for when queryable state not ready 
/ reachable*


## Brief change log

  - *Improve error message for when queryable state not ready / reachable*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8946-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6060.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6060


commit b75f713f624ee6718f7550a416376ed09af52cc5
Author: yanghua 
Date:   2018-05-23T09:42:04Z

[FLINK-8946] TaskManager stop sending metrics after JobManager failover




---


[jira] [Commented] (FLINK-9326) TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9326?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486997#comment-16486997
 ] 

ASF GitHub Bot commented on FLINK-9326:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6041#discussion_r190187802
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
 ---
@@ -103,9 +103,16 @@ public JobExecutionResult execute(String jobName) 
throws Exception {
configuration.setInteger(RestOptions.PORT, 0);
}
 
+   int numSlotsPerTaskManager;
+   if (configuration.contains(TaskManagerOptions.NUM_TASK_SLOTS)) {
--- End diff --

cc @zentol


> TaskManagerOptions.NUM_TASK_SLOTS does not work for local/embedded mode
> ---
>
> Key: FLINK-9326
> URL: https://issues.apache.org/jira/browse/FLINK-9326
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.5.0
> Environment: Linux 64bit
> Flink branch release-1.5
>Reporter: Samuel Doyle
>Assignee: vinoyang
>Priority: Major
>
> When attempting to set the number of task slots via the api such ash
> {code:java}
> configuration = new Configuration();
> configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 16);
> configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, 1);
> {code}
> I will always end up with the default slot setting based on the number of 
> cores I have where my standalone instance is running, it doesn't matter what 
> I set the the NUM_TASK_SLOTS to, it has no effect



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9420) Add tests for SQL IN sub-query operator in streaming

2018-05-23 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9420:
---

 Summary: Add tests for SQL IN sub-query operator in streaming
 Key: FLINK-9420
 URL: https://issues.apache.org/jira/browse/FLINK-9420
 Project: Flink
  Issue Type: Test
  Components: Table API  SQL
Reporter: Timo Walther


In FLINK-6094 we implemented non-windowed inner joins. The Table API & SQL 
should now  support the {{IN}} operator for sub-queries in streaming. Batch 
support has been added in FLINK-4565. We need to add unit tests, an IT case, 
and update the docs about that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190141550
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 ---
@@ -78,6 +82,20 @@ public int numKeyedStateEntries() {
}
}
 
+   public  S getState(K key, StateDescriptor 
stateDesc) throws Exception {
--- End diff --

Is this change necessary? We should only modify code outside of 
`flink-table` if it is urgently needed.


---


[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486826#comment-16486826
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190140171
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190137151
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes,
+  

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486824#comment-16486824
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190137151
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new 

[jira] [Commented] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-05-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16486830#comment-16486830
 ] 

ASF GitHub Bot commented on FLINK-6968:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190141671
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/QueryableTableSinkTest.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.streaming.api.TimeDomain
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator
+import org.apache.flink.table.api.StreamQueryConfig
+import org.apache.flink.table.runtime.harness.HarnessTestBase
+import org.apache.flink.table.sinks.{QueryableStateProcessFunction, 
RowKeySelector}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class QueryableTableSinkTest extends HarnessTestBase {
+  @Test
+  def testRowSelector(): Unit = {
+val keyTypes = Array(TypeInformation.of(classOf[List[Int]]),
+  TypeInformation.of(classOf[String]))
+val selector = new RowKeySelector(Array(0, 2), new 
RowTypeInfo(keyTypes:_*))
+
+val src = Row.of(List(1), "a", "b")
+val key = selector.getKey(JTuple2.of(true, src))
+
+assertEquals(Row.of(List(1), "b"), key)
+  }
+
+  @Test
+  def testProcessFunction(): Unit = {
+val queryConfig = new StreamQueryConfig()
+  .withIdleStateRetentionTime(Time.milliseconds(2), 
Time.milliseconds(10))
+
+val keys = Array("id")
+val keyType = new RowTypeInfo(TypeInformation.of(classOf[String]))
+val fieldNames = Array("id", "is_manager", "name")
+val fieldTypes: Array[TypeInformation[_]] = Array(
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[JBool]).asInstanceOf[TypeInformation[_]],
+  TypeInformation.of(classOf[String]).asInstanceOf[TypeInformation[_]])
+val func = new QueryableStateProcessFunction(
+  "test",
+  queryConfig,
+  keys,
+  fieldNames,
+  fieldTypes,
+  TimeDomain.PROCESSING_TIME)
+
+val operator = new KeyedProcessOperator[Row, JTuple2[JBool, Row], 
Void](func)
+
+val testHarness = createHarnessTester(operator,
+  new RowKeySelector(Array(0), keyType),
+  keyType)
+
+testHarness.open()
+
+val stateDesc1 = new ValueStateDescriptor[JBool]("is_manager",
+  TypeInformation.of(classOf[JBool]))
+stateDesc1.initializeSerializerUnlessSet(operator.getExecutionConfig)
+val stateDesc2 = new ValueStateDescriptor[String]("name", 
TypeInformation.of(classOf[String]))
+stateDesc2.initializeSerializerUnlessSet(operator.getExecutionConfig)
+val key1 = Row.of("1")
+val key2 = Row.of("2")
+
+testHarness.processElement(JTuple2.of(true, Row.of("1", 
JBool.valueOf(true), "jeff")), 2)
+testHarness.processElement(JTuple2.of(true, Row.of("2", 
JBool.valueOf(false), "dean")), 6)
+
+val stateOf = (key: Row, sd: ValueStateDescriptor[_]) => {
+  testHarness.getState(key, sd).value().asInstanceOf[AnyRef]
+}
+
+var expectedData = Array(
+  Row.of(JBool.valueOf(true), "jeff"),
+  Row.of(JBool.valueOf(false), "dean"))
+var storedData = Array(
+  Row.of(stateOf(key1, stateDesc1), stateOf(key1, stateDesc2)),
+  

[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190135653
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
--- End diff --

Update the comment.


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190137656
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import java.lang.{Boolean => JBool}
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, 
RowTypeInfo}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction
+import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain}
+import org.apache.flink.table.api.StreamQueryConfig
+import 
org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+/**
+  * A QueryableTableSink stores table in queryable state.
+  *
+  * This class stores table in queryable state so that users can access 
table data without
+  * dependency on external storage.
+  *
+  * Since this is only a kv storage, currently user can only do point 
query against it.
+  *
+  * Example:
+  * {{{
+  *   val env = ExecutionEnvironment.getExecutionEnvironment
+  *   val tEnv = TableEnvironment.getTableEnvironment(env)
+  *
+  *   val table: Table  = ...
+  *
+  *   val queryableTableSink: QueryableTableSink = new QueryableTableSink(
+  *   "prefix",
+  *   queryConfig,
+  *   None)
+  *
+  *   tEnv.writeToSink(table, queryableTableSink, config)
+  * }}}
+  *
+  * When program starts to run, user can access state with 
QueryableStateClient.
+  * {{{
+  *   val client = new QueryableStateClient(tmHostname, proxyPort)
+  *   val data = client.getKvState(
+  *   jobId,
+  *   "prefix-column1",
+  *   Row.of(1),
+  *   new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), 
Array("id"))
+  *   stateDescriptor)
+  * .get();
+  *
+  * }}}
+  *
+  *
+  * @param namePrefix
+  * @param queryConfig
+  * @param cleanupTimeDomain
+  */
+class QueryableTableSink(
+private val namePrefix: String,
+private val queryConfig: StreamQueryConfig,
+private val cleanupTimeDomain: Option[TimeDomain])
+  extends UpsertStreamTableSink[Row]
+with TableSinkBase[JTuple2[JBool, Row]] {
+  private var keys: Array[String] = _
+
+  override def setKeyFields(keys: Array[String]): Unit = {
+if (keys == null) {
+  throw new IllegalArgumentException("keys can't be null!")
+}
+this.keys = keys
+  }
+
+  override def setIsAppendOnly(isAppendOnly: JBool): Unit = {
+if (isAppendOnly) {
+  throw new IllegalArgumentException("A QueryableTableSink can not be 
used with append-only " +
+"tables as the table would grow infinitely")
+}
+  }
+
+  override def getRecordType: TypeInformation[Row] = new 
RowTypeInfo(getFieldTypes, getFieldNames)
+
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBool, 
Row]]): Unit = {
+val keyIndices = keys.map(getFieldNames.indexOf(_))
+val keyTypes = keyIndices.map(getFieldTypes(_))
+
+val keySelectorType = new RowTypeInfo(keyTypes, keys)
+
+val processFunction = new QueryableStateProcessFunction(
+  namePrefix,
+  queryConfig,
+  keys,
+  getFieldNames,
+  getFieldTypes,
+  

  1   2   3   4   >