[jira] [Assigned] (FLINK-7789) Add handler for Async IO operator timeouts

2018-05-07 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-7789:
--

Assignee: blues zheng

> Add handler for Async IO operator timeouts 
> ---
>
> Key: FLINK-7789
> URL: https://issues.apache.org/jira/browse/FLINK-7789
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Karthik Deivasigamani
>Assignee: blues zheng
>Priority: Major
>
> Currently Async IO operator does not provide a mechanism to handle timeouts. 
> When a request times out it an exception is thrown and job is restarted. It 
> would be good to pass a AsyncIOTimeoutHandler which can be implemented by the 
> user and passed in the constructor.
> Here is the discussion from apache flink users mailing list 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/async-io-operator-timeouts-tt16068.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9073) Resume from savepoint end-to-end tests should be extended for different state backends

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466809#comment-16466809
 ] 

ASF GitHub Bot commented on FLINK-9073:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5926#discussion_r186608526
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -58,25 +58,97 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (no parallelism change) end-to-end 
test\n"
+  printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale up) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale down) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale up) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, async, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, non-incremental, no 
parallelism change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, incremental, no parallelism 
change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
--- End diff --

This makes sense, will address this.


> Resume from savepoint end-to-end tests should be extended for different state 
> backends
> --
>
> 

[jira] [Commented] (FLINK-9073) Resume from savepoint end-to-end tests should be extended for different state backends

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466810#comment-16466810
 ] 

ASF GitHub Bot commented on FLINK-9073:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5926
  
Thanks for the review @StefanRRichter! Will address your comment and merge 
this.


> Resume from savepoint end-to-end tests should be extended for different state 
> backends
> --
>
> Key: FLINK-9073
> URL: https://issues.apache.org/jira/browse/FLINK-9073
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The resuming from savepoint end-to-end test script, {{test_resume_savepoint}} 
> should be extended for the different state backends.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e tests for d...

2018-05-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5926
  
Thanks for the review @StefanRRichter! Will address your comment and merge 
this.


---


[GitHub] flink pull request #5926: [FLINK-9073] [e2e-tests] Extend savepoint e2e test...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5926#discussion_r186608526
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -58,25 +58,97 @@ fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (no parallelism change) end-to-end 
test\n"
+  printf "Running Resuming Savepoint (file, async, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale up) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, sync, no parallelism change) 
end-to-end test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
   EXIT_CODE=$?
 fi
 
 if [ $EXIT_CODE == 0 ]; then
   printf 
"\n==\n"
-  printf "Running Resuming Savepoint (scale down) end-to-end test\n"
+  printf "Running Resuming Savepoint (file, async, scale up) end-to-end 
test\n"
   printf 
"==\n"
-  $END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale up) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 4
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, async, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (file, sync, scale down) end-to-end 
test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=file STATE_BACKEND_FILE_ASYNC=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 4 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, non-incremental, no 
parallelism change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=false 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
+  EXIT_CODE=$?
+fi
+
+if [ $EXIT_CODE == 0 ]; then
+  printf 
"\n==\n"
+  printf "Running Resuming Savepoint (rocks, incremental, no parallelism 
change) end-to-end test\n"
+  printf 
"==\n"
+  STATE_BACKEND_TYPE=rocks STATE_BACKEND_ROCKS_INCREMENTAL=true 
$END_TO_END_DIR/test-scripts/test_resume_savepoint.sh 2 2
--- End diff --

This makes sense, will address this.


---


[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9315:

Description: 
Currently the TaskManagerRunner would be waiting for rpc service terminated 
synchronously. If this happened in rpc service related thread (akka dispatcher 
thread), it would be stuck forever.

We should change the behavior like in ClusterEntrypoint, make it not waiting 
but asynchronous with a future.

  was:Currently shutdown TaskManagerRunner may be stuck. Because 


> TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated
> ---
>
> Key: FLINK-9315
> URL: https://issues.apache.org/jira/browse/FLINK-9315
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>
> Currently the TaskManagerRunner would be waiting for rpc service terminated 
> synchronously. If this happened in rpc service related thread (akka 
> dispatcher thread), it would be stuck forever.
> We should change the behavior like in ClusterEntrypoint, make it not waiting 
> but asynchronous with a future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466795#comment-16466795
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186606104
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

Makes sense. Alright, lets leave this as is then.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466797#comment-16466797
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
Thanks for the update @FredTing.
I'll try to take another look at the PR within the next days.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5958: [FLINK-8500] Get the timestamp of the Kafka message from ...

2018-05-07 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5958
  
Thanks for the update @FredTing.
I'll try to take another look at the PR within the next days.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186606104
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

Makes sense. Alright, lets leave this as is then.


---


[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9315:

Summary: TaskManagerRunner.shutDown() may be stuck in waiting rpc service 
terminated  (was: TaskManagerRunner.shutDown() would be stuck in waiting rpc 
service terminated)

> TaskManagerRunner.shutDown() may be stuck in waiting rpc service terminated
> ---
>
> Key: FLINK-9315
> URL: https://issues.apache.org/jira/browse/FLINK-9315
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>
> Currently shutdown TaskManagerRunner may be stuck. Because 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9315:

Description: Currently shutdown TaskManagerRunner may be stuck. Because   
(was: Currently shutdown TaskManagerRunner would stuck)

> TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated
> -
>
> Key: FLINK-9315
> URL: https://issues.apache.org/jira/browse/FLINK-9315
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>
> Currently shutdown TaskManagerRunner may be stuck. Because 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9315:

Summary: TaskManagerRunner.shutDown() would be stuck in waiting rpc service 
terminated  (was: TaskManagerRunner.shutDown() would stuck in waiting rpc 
service terminated)

> TaskManagerRunner.shutDown() would be stuck in waiting rpc service terminated
> -
>
> Key: FLINK-9315
> URL: https://issues.apache.org/jira/browse/FLINK-9315
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>
> Currently shutdown TaskManagerRunner would stuck



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9315) TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Biao Liu updated FLINK-9315:

Description: Currently shutdown TaskManagerRunner would stuck

> TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated
> --
>
> Key: FLINK-9315
> URL: https://issues.apache.org/jira/browse/FLINK-9315
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.0
>Reporter: Biao Liu
>Assignee: Biao Liu
>Priority: Major
>
> Currently shutdown TaskManagerRunner would stuck



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9315) TaskManagerRunner.shutDown() would stuck in waiting rpc service terminated

2018-05-07 Thread Biao Liu (JIRA)
Biao Liu created FLINK-9315:
---

 Summary: TaskManagerRunner.shutDown() would stuck in waiting rpc 
service terminated
 Key: FLINK-9315
 URL: https://issues.apache.org/jira/browse/FLINK-9315
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.5.0
Reporter: Biao Liu
Assignee: Biao Liu






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466777#comment-16466777
 ] 

ASF GitHub Bot commented on FLINK-9258:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r186603684
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

Do not need any test that verify this change ?


> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> ---
>
> Key: FLINK-9258
> URL: https://issues.apache.org/jira/browse/FLINK-9258
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race 
> condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>     at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>     at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>     at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>     at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>     at java.util.HashMap.putMapEntries(HashMap.java:511)
>     at java.util.HashMap.putAll(HashMap.java:784)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>     at 
> com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>     at 
> com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>     at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>     at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>     at 
> 

[GitHub] flink pull request #5959: [FLINK-9258][metrics] Thread-safe initialization o...

2018-05-07 Thread yuqi1129
Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5959#discussion_r186603684
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ComponentMetricGroup.java
 ---
@@ -57,11 +57,12 @@ public ComponentMetricGroup(MetricRegistry registry, 
String[] scope, P parent) {
if (variables == null) { // avoid synchronization for common 
case
synchronized (this) {
if (variables == null) {
-   variables = new HashMap<>();
-   putVariables(variables);
+   Map tmpVariables = new 
HashMap<>();
+   putVariables(tmpVariables);
if (parent != null) { // not true for 
Job-/TaskManagerMetricGroup
-   
variables.putAll(parent.getAllVariables());
+   
tmpVariables.putAll(parent.getAllVariables());
}
+   variables = tmpVariables;
--- End diff --

Do not need any test that verify this change ?


---


[jira] [Commented] (FLINK-9194) Finished jobs are not archived to HistoryServer

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466736#comment-16466736
 ] 

ASF GitHub Bot commented on FLINK-9194:
---

Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5902#discussion_r186599431
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -162,7 +166,7 @@ public void run() {
String 
json = archive.getJson();
 
File 
target;
-   if 
(path.equals("/joboverview")) {
--- End diff --

Agree


> Finished jobs are not archived to HistoryServer
> ---
>
> Key: FLINK-9194
> URL: https://issues.apache.org/jira/browse/FLINK-9194
> Project: Flink
>  Issue Type: Bug
>  Components: History Server, JobManager
>Affects Versions: 1.5.0
> Environment: Flink 2af481a
>Reporter: Gary Yao
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.6.0
>
>
> In flip6 mode, jobs are not archived to the HistoryServer. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5902: [FLINK-9194][history] Add HistoryServer support to...

2018-05-07 Thread yuqi1129
Github user yuqi1129 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5902#discussion_r186599431
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 ---
@@ -162,7 +166,7 @@ public void run() {
String 
json = archive.getJson();
 
File 
target;
-   if 
(path.equals("/joboverview")) {
--- End diff --

Agree


---


[jira] [Closed] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8237.

   Resolution: Fixed
Fix Version/s: 1.4.3
   1.5.0

Fixed for 1.4.3 with c7d9ad49e9b3b82b934738903c37de2bd77d8ea7
Fixed for 1.5.0 with 7d6c81d4b8b7340c53717e38725dbe1e3199dfd2
Fixed for 1.6.0 with 7a31ffd3a5455d7da0a459b62081ca27466833bb

> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Assignee: Pavel Shvetsov
>Priority: Minor
> Fix For: 1.5.0, 1.4.3
>
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9281) LogBack not working

2018-05-07 Thread Tim (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466438#comment-16466438
 ] 

Tim commented on FLINK-9281:


Not really.   Here's what I know

a) If I run the Flink job in my IDE, logback (latest version) works fine.

b) If I run in a standalone cluster, no output.  Actually, it does not look 
like the -Dlogback.configurationFile JVM arg is even being honored

I'll try doing "{{-verbose:class}}" to see if the LogBack classes are even 
being loaded.

> LogBack not working
> ---
>
> Key: FLINK-9281
> URL: https://issues.apache.org/jira/browse/FLINK-9281
> Project: Flink
>  Issue Type: Bug
>  Components: Logging
>Affects Versions: 1.4.2
>Reporter: Tim
>Priority: Major
>
> I am trying to get Flink to work with Logback instead of Log4J.   However, it 
> is not working. 
> My setup is as follows the advice on this page: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster
>  * Flink v1.4.2 running a stand-alone cluster. 
>  * Started JobManager as a foreground process (bin/jobmanager.sh 
> start-foreground cluster).  I updated bin/flink-console.sh to reference 
> logback.xml via -Dlogback.configurationFile=file:/path/to/logfile.
>  * Removed log4j jars under libs/  (log4j-1.2.xx.jar and 
> sfl4j-log4j12-xxx.jar)
>  * Added logback jars under libs/   (logback-classic, logback-core, 
> log4j-over-slf4j.jar) 
> However, I am not getting any file created.   Also, as a dumb test I 
> referenced a non-existent logback.xml file (changed path to a non-existent 
> folder) just to see if any errors appear on stdout, but nothing.
> Thanks
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-8237:


Assignee: Pavel Shvetsov

> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Assignee: Pavel Shvetsov
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9312) Perform mutual authentication during SSL handshakes

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466435#comment-16466435
 ] 

ASF GitHub Bot commented on FLINK-9312:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
@EronWright This might be interesting to you as well


> Perform mutual authentication during SSL handshakes
> ---
>
> Key: FLINK-9312
> URL: https://issues.apache.org/jira/browse/FLINK-9312
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the Flink processes encrypted connections via SSL:
>   - Data exchange TM - TM
>   - RPC JM - TM
>   - Blob Service JM - TM
> However, the server side always accepts any client to build up the 
> connection, meaning the connections are not strongly authenticated.
> Activating SSL mutual authentication solves that - only processes that have 
> the same certificate can connect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
@EronWright This might be interesting to you as well


---


[jira] [Commented] (FLINK-9312) Perform mutual authentication during SSL handshakes

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466431#comment-16466431
 ] 

ASF GitHub Bot commented on FLINK-9312:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5966

[FLINK-9312] [security] Add mutual authentication for RPC and data plane

## What is the purpose of the change

Currently, the Flink processes encrypted connections via SSL:
  - Data exchange TM - TM
  - RPC JM - TM
  - Blob Service JM - TM

  - (Optionally to ZooKeeper and connectors, this is connector specific and 
not in scope of this change)

However, the server side always accepts any client to build up the 
connection, meaning the connections are not strongly authenticated. Activating 
SSL mutual authentication strengthens this significantly - only processes that 
have access to the same certificate can connect.

## Brief change log

  - Activate mutual auth in akka (via akka config)
  - Activate mutual auth in Netty for data shuffles via `SSLContext` and 
`SSLEngine` parameters

## Verifying this change

  - Adds a test to the `NettyClientServerSslTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink mutual_auth

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5966


commit 8bceb03d5653c94247b72d6256f4e9e37b036e35
Author: Stephan Ewen 
Date:   2018-05-07T17:44:33Z

[FLINK-9313] [security] Activate mutual authentication for RPC/akka

commit 59b017580d30904418e0867ac122a8183dc5db70
Author: Stephan Ewen 
Date:   2018-05-07T19:28:41Z

[FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's 
data plane




> Perform mutual authentication during SSL handshakes
> ---
>
> Key: FLINK-9312
> URL: https://issues.apache.org/jira/browse/FLINK-9312
> Project: Flink
>  Issue Type: New Feature
>  Components: Security
>Reporter: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, the Flink processes encrypted connections via SSL:
>   - Data exchange TM - TM
>   - RPC JM - TM
>   - Blob Service JM - TM
> However, the server side always accepts any client to build up the 
> connection, meaning the connections are not strongly authenticated.
> Activating SSL mutual authentication solves that - only processes that have 
> the same certificate can connect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...

2018-05-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5966

[FLINK-9312] [security] Add mutual authentication for RPC and data plane

## What is the purpose of the change

Currently, the Flink processes encrypted connections via SSL:
  - Data exchange TM - TM
  - RPC JM - TM
  - Blob Service JM - TM

  - (Optionally to ZooKeeper and connectors, this is connector specific and 
not in scope of this change)

However, the server side always accepts any client to build up the 
connection, meaning the connections are not strongly authenticated. Activating 
SSL mutual authentication strengthens this significantly - only processes that 
have access to the same certificate can connect.

## Brief change log

  - Activate mutual auth in akka (via akka config)
  - Activate mutual auth in Netty for data shuffles via `SSLContext` and 
`SSLEngine` parameters

## Verifying this change

  - Adds a test to the `NettyClientServerSslTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink mutual_auth

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5966.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5966


commit 8bceb03d5653c94247b72d6256f4e9e37b036e35
Author: Stephan Ewen 
Date:   2018-05-07T17:44:33Z

[FLINK-9313] [security] Activate mutual authentication for RPC/akka

commit 59b017580d30904418e0867ac122a8183dc5db70
Author: Stephan Ewen 
Date:   2018-05-07T19:28:41Z

[FLINK-9314] [security] Add mutual authentication for Netty / TaskManager's 
data plane




---


[jira] [Comment Edited] (FLINK-9311) PubSub connector

2018-05-07 Thread Richard Deurwaarder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466409#comment-16466409
 ] 

Richard Deurwaarder edited comment on FLINK-9311 at 5/7/18 8:06 PM:


PubSub works more like RabbitMQ than Kafka using ack/nack. It has 
[atleast-once-delivery|https://cloud.google.com/pubsub/docs/subscriber], but 
using a [message 
id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--]
 exactly-once can be achived.

Is there an existing connector using message id's for deduplication?


was (Author: xeli):
PubSub works more like RabbitMQ than Kafka using ack/nack. It has 
atleast-once-delivery, but using a [message 
id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--]
 exactly-once can be achived.

Is there an existing connector using message id's for deduplication?

> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Priority: Minor
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9311) PubSub connector

2018-05-07 Thread Richard Deurwaarder (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466409#comment-16466409
 ] 

Richard Deurwaarder commented on FLINK-9311:


PubSub works more like RabbitMQ than Kafka using ack/nack. It has 
atleast-once-delivery, but using a [message 
id|http://googleapis.github.io/googleapis/java/all/latest/apidocs/com/google/pubsub/v1/PubsubMessageOrBuilder.html#getMessageId--]
 exactly-once can be achived.

Is there an existing connector using message id's for deduplication?

> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Priority: Minor
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9314) Enable SSL mutual authentication for Netty / TaskManagers

2018-05-07 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9314:
---

 Summary: Enable SSL mutual authentication for Netty / TaskManagers
 Key: FLINK-9314
 URL: https://issues.apache.org/jira/browse/FLINK-9314
 Project: Flink
  Issue Type: Sub-task
  Components: Security
Reporter: Stephan Ewen
Assignee: Stephan Ewen


Making sure that TaskManagers authenticate both ways (client and server) 
requires giving access to keystore and truststore on both ends, and enabling 
the client authentication flag when creating the SSL Engine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9313) Enable mutual authentication for RPC (akka)

2018-05-07 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9313:
---

 Summary: Enable mutual authentication for RPC (akka) 
 Key: FLINK-9313
 URL: https://issues.apache.org/jira/browse/FLINK-9313
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Reporter: Stephan Ewen
Assignee: Stephan Ewen


Trivial, just needs to add the respective line in the akka configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9312) Perform mutual authentication during SSL handshakes

2018-05-07 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9312:
---

 Summary: Perform mutual authentication during SSL handshakes
 Key: FLINK-9312
 URL: https://issues.apache.org/jira/browse/FLINK-9312
 Project: Flink
  Issue Type: New Feature
  Components: Security
Reporter: Stephan Ewen
 Fix For: 1.6.0


Currently, the Flink processes encrypted connections via SSL:

  - Data exchange TM - TM
  - RPC JM - TM
  - Blob Service JM - TM

However, the server side always accepts any client to build up the connection, 
meaning the connections are not strongly authenticated.

Activating SSL mutual authentication solves that - only processes that have the 
same certificate can connect.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466383#comment-16466383
 ] 

ASF GitHub Bot commented on FLINK-8255:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527205
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -157,15 +156,15 @@ public T set(T record, F fieldValue) {
 
SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) {
--- End diff --

accessing fields in a `Row` will fail because `Row` does not extend 
`Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one 
when we deal with a `DataStream`. We would then need to add the 
`RowFieldAccessor` to the `FieldAccessorFactory`.


> Key expressions on named row types do not work
> --
>
> Key: FLINK-8255
> URL: https://issues.apache.org/jira/browse/FLINK-8255
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The following program fails with a {{ClassCastException}}. It seems that key 
> expressions and rows are not tested well. We should add more tests for them.
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
> String[] fieldNames = new String[]{"id", "value"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
> env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo)
> .keyBy("id").sum("value").print();
> env.execute("Streaming WordCount");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466382#comment-16466382
 ] 

ASF GitHub Bot commented on FLINK-8255:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -197,7 +196,7 @@ public T set(T record, F fieldValue) {
checkNotNull(typeInfo, "typeInfo must not be null.");
checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
 
-   int arity = ((TupleTypeInfo) typeInfo).getArity();
+   int arity = typeInfo.getArity();
--- End diff --

Same as for `SimpleTupleFieldAccessor`.


> Key expressions on named row types do not work
> --
>
> Key: FLINK-8255
> URL: https://issues.apache.org/jira/browse/FLINK-8255
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The following program fails with a {{ClassCastException}}. It seems that key 
> expressions and rows are not tested well. We should add more tests for them.
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
> String[] fieldNames = new String[]{"id", "value"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
> env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo)
> .keyBy("id").sum("value").print();
> env.execute("Streaming WordCount");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466385#comment-16466385
 ] 

ASF GitHub Bot commented on FLINK-8255:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186528901
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
 ---
@@ -368,4 +369,23 @@ public void testIllegalBasicType2() {
 
FieldAccessor f = 
FieldAccessorFactory.getAccessor(tpeInfo, "foo", null);
}
+
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testRowTypeInfo() {
--- End diff --

This test just validates that a `FieldAccessor` is created. At runtime it 
would fail with a `ClassCastException`.


> Key expressions on named row types do not work
> --
>
> Key: FLINK-8255
> URL: https://issues.apache.org/jira/browse/FLINK-8255
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The following program fails with a {{ClassCastException}}. It seems that key 
> expressions and rows are not tested well. We should add more tests for them.
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
> String[] fieldNames = new String[]{"id", "value"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
> env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo)
> .keyBy("id").sum("value").print();
> env.execute("Streaming WordCount");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466381#comment-16466381
 ] 

ASF GitHub Bot commented on FLINK-8255:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186477977
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -41,7 +41,7 @@
 * is regarded in the reduce function. First index has highest priority 
and last index has
 * least priority.
 */
-   public SelectByMinFunction(TupleTypeInfo type, int... fields) {
+   public SelectByMinFunction(TupleTypeInfoBase type, int... fields) {
--- End diff --

The `ReduceFunction` is still typed to `T extends Tuple` such that this 
will still fail at runtime. The same is true for all other built-in aggregation 
method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. 

This cannot be resolved without major changes. I don't think we should add 
these features, but rather throw meaningful error messages instead of 
`ClassCastException`. 

Can you try to override the the `isTupleType()` method in `RowTypeInfo` and 
return `false`? 
This would prevent `Row` from being used in contexts that are only 
supported for `Tuple`.


> Key expressions on named row types do not work
> --
>
> Key: FLINK-8255
> URL: https://issues.apache.org/jira/browse/FLINK-8255
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The following program fails with a {{ClassCastException}}. It seems that key 
> expressions and rows are not tested well. We should add more tests for them.
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
> String[] fieldNames = new String[]{"id", "value"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
> env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo)
> .keyBy("id").sum("value").print();
> env.execute("Streaming WordCount");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8255) Key expressions on named row types do not work

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466384#comment-16466384
 ] 

ASF GitHub Bot commented on FLINK-8255:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186484649
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
 ---
@@ -230,4 +235,43 @@ public String toString() {
}
}
 
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsDataset() {
+
+   final ExecutionEnvironment env = ExecutionEnvironment
+   .getExecutionEnvironment();
+   TypeInformation[] types = new TypeInformation[] {Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+   DataSet tupleDs = env
+   .fromCollection(Collections.singleton(new Row(2)), 
rowTypeInfo);
+
+   tupleDs.maxBy(0);
+   tupleDs.minBy(0);
+   }
+
+/**
+ * Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() {
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   TypeInformation[] types = new TypeInformation[]{Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+
+   UnsortedGrouping groupDs = 
env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0);
+
+   groupDs.maxBy(1);
+   groupDs.minBy(1);
--- End diff --

The tests pass because the program is not executed. 
You would have to call `env.collect()` to run the program and compare the 
returned result against the expected result. As I pointed out before, this will 
fail, because the operator will cast the `Row` objects to `Tuple`.


> Key expressions on named row types do not work
> --
>
> Key: FLINK-8255
> URL: https://issues.apache.org/jira/browse/FLINK-8255
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Major
>
> The following program fails with a {{ClassCastException}}. It seems that key 
> expressions and rows are not tested well. We should add more tests for them.
> {code}
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT};
> String[] fieldNames = new String[]{"id", "value"};
> RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
> env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo)
> .keyBy("id").sum("value").print();
> env.execute("Streaming WordCount");
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186528901
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
 ---
@@ -368,4 +369,23 @@ public void testIllegalBasicType2() {
 
FieldAccessor f = 
FieldAccessorFactory.getAccessor(tpeInfo, "foo", null);
}
+
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testRowTypeInfo() {
--- End diff --

This test just validates that a `FieldAccessor` is created. At runtime it 
would fail with a `ClassCastException`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186484649
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
 ---
@@ -230,4 +235,43 @@ public String toString() {
}
}
 
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsDataset() {
+
+   final ExecutionEnvironment env = ExecutionEnvironment
+   .getExecutionEnvironment();
+   TypeInformation[] types = new TypeInformation[] {Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+   DataSet tupleDs = env
+   .fromCollection(Collections.singleton(new Row(2)), 
rowTypeInfo);
+
+   tupleDs.maxBy(0);
+   tupleDs.minBy(0);
+   }
+
+/**
+ * Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() {
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   TypeInformation[] types = new TypeInformation[]{Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+
+   UnsortedGrouping groupDs = 
env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0);
+
+   groupDs.maxBy(1);
+   groupDs.minBy(1);
--- End diff --

The tests pass because the program is not executed. 
You would have to call `env.collect()` to run the program and compare the 
returned result against the expected result. As I pointed out before, this will 
fail, because the operator will cast the `Row` objects to `Tuple`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186477977
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -41,7 +41,7 @@
 * is regarded in the reduce function. First index has highest priority 
and last index has
 * least priority.
 */
-   public SelectByMinFunction(TupleTypeInfo type, int... fields) {
+   public SelectByMinFunction(TupleTypeInfoBase type, int... fields) {
--- End diff --

The `ReduceFunction` is still typed to `T extends Tuple` such that this 
will still fail at runtime. The same is true for all other built-in aggregation 
method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. 

This cannot be resolved without major changes. I don't think we should add 
these features, but rather throw meaningful error messages instead of 
`ClassCastException`. 

Can you try to override the the `isTupleType()` method in `RowTypeInfo` and 
return `false`? 
This would prevent `Row` from being used in contexts that are only 
supported for `Tuple`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527205
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -157,15 +156,15 @@ public T set(T record, F fieldValue) {
 
SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) {
--- End diff --

accessing fields in a `Row` will fail because `Row` does not extend 
`Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one 
when we deal with a `DataStream`. We would then need to add the 
`RowFieldAccessor` to the `FieldAccessorFactory`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -197,7 +196,7 @@ public T set(T record, F fieldValue) {
checkNotNull(typeInfo, "typeInfo must not be null.");
checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
 
-   int arity = ((TupleTypeInfo) typeInfo).getArity();
+   int arity = typeInfo.getArity();
--- End diff --

Same as for `SimpleTupleFieldAccessor`.


---


[jira] [Commented] (FLINK-9310) Update default cyphersuites

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466380#comment-16466380
 ] 

ASF GitHub Bot commented on FLINK-9310:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5965
  
@EronWright This might be interesting to you.


> Update default cyphersuites
> ---
>
> Key: FLINK-9310
> URL: https://issues.apache.org/jira/browse/FLINK-9310
> Project: Flink
>  Issue Type: Task
>  Components: Security
>Affects Versions: 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>
> The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no 
> longer recommended.
> RFC 7525 [1] recommends to use the following cipher suites only:
> * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
> * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
> * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384
> * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
> [1] https://tools.ietf.org/html/rfc7525



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5965: [FLINK-9310] [security] Update standard cipher suites for...

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5965
  
@EronWright This might be interesting to you.


---


[jira] [Commented] (FLINK-9310) Update default cyphersuites

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466379#comment-16466379
 ] 

ASF GitHub Bot commented on FLINK-9310:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5965

[FLINK-9310] [security] Update standard cipher suites for secure mode

## What is the purpose of the change

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525

## Brief change log

Updates the default value of the respective config option to
```

TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
```

## Verifying this change

This change is already covered by the existing tests that test SSL setups.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
update_cipher_suits

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5965


commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31
Author: Stephan Ewen 
Date:   2018-05-07T17:47:00Z

[FLINK-9310] [security] Update standard cipher suites for secure mode

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525




> Update default cyphersuites
> ---
>
> Key: FLINK-9310
> URL: https://issues.apache.org/jira/browse/FLINK-9310
> Project: Flink
>  Issue Type: Task
>  Components: Security
>Affects Versions: 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>
> The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no 
> longer recommended.
> RFC 7525 [1] recommends to use the following cipher suites only:
> * TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
> * TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
> * TLS_DHE_RSA_WITH_AES_256_GCM_SHA384
> * TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
> [1] https://tools.ietf.org/html/rfc7525



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5965: [FLINK-9310] [security] Update standard cipher sui...

2018-05-07 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5965

[FLINK-9310] [security] Update standard cipher suites for secure mode

## What is the purpose of the change

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525

## Brief change log

Updates the default value of the respective config option to
```

TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
```

## Verifying this change

This change is already covered by the existing tests that test SSL setups.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
update_cipher_suits

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5965.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5965


commit 9b24574cd437ddbc2d3546c1fa0f73e983c02e31
Author: Stephan Ewen 
Date:   2018-05-07T17:47:00Z

[FLINK-9310] [security] Update standard cipher suites for secure mode

This sets the cipher suits accepted by default to those recommended in
IETF RFC 7525 : https://tools.ietf.org/html/rfc7525




---


[jira] [Closed] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-07 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8690.

   Resolution: Implemented
Fix Version/s: 1.6.0

Implemented for 1.6.0 with 53610c31e88d3c4194990de70fb99d9f935f2e0d

> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9311) PubSub connector

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466356#comment-16466356
 ] 

Stephan Ewen commented on FLINK-9311:
-

The RichParallelSourceFunction is a good place to start.

What is the exactly-once model behind PubSub? Is it message ack/commit, like 
for example in RabbitMQ, or is it sequence-number/offset commits, like in 
Kafka/Kinesis?

> PubSub connector
> 
>
> Key: FLINK-9311
> URL: https://issues.apache.org/jira/browse/FLINK-9311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Richard Deurwaarder
>Priority: Minor
>
> I would like start adding some google cloud connectors starting with a PubSub 
> Source. I have a basic implementation ready but I want it to be able to:
>  * easily scale up (should I have it extend RichParallelSourceFunction for 
> this?)
>  * Make it easier to provide the google cloud credentials. This would require 
> being able to send some json string / ServiceAccount to the nodes when 
> starting up this source.
> Could this be something that would be useful for others and added to the 
> flink connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466345#comment-16466345
 ] 

ASF GitHub Bot commented on FLINK-6335:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3764


> Parse DISTINCT over grouped window in stream SQL
> 
>
> Key: FLINK-6335
> URL: https://issues.apache.org/jira/browse/FLINK-6335
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. 
> This jira proposes to support the {{DISTINCT}} keyword on streaming 
> aggregation using the same technique on the batch side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466348#comment-16466348
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5940


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466346#comment-16466346
 ] 

ASF GitHub Bot commented on FLINK-8237:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5927


> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466347#comment-16466347
 ] 

ASF GitHub Bot commented on FLINK-6373:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3765


> Add runtime support for distinct aggregation over grouped windows
> -
>
> Key: FLINK-6373
> URL: https://issues.apache.org/jira/browse/FLINK-6373
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the 
> distinct aggregations over grouped windows. This jira tracks the effort of 
> adding runtime support for the query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #3764: [FLINK-6335] Parse DISTINCT over grouped windows i...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3764


---


[GitHub] flink pull request #3765: [FLINK-6373] Add runtime support for distinct aggr...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3765


---


[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5927


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5940


---


[jira] [Created] (FLINK-9311) PubSub connector

2018-05-07 Thread Richard Deurwaarder (JIRA)
Richard Deurwaarder created FLINK-9311:
--

 Summary: PubSub connector
 Key: FLINK-9311
 URL: https://issues.apache.org/jira/browse/FLINK-9311
 Project: Flink
  Issue Type: New Feature
  Components: Streaming Connectors
Reporter: Richard Deurwaarder


I would like start adding some google cloud connectors starting with a PubSub 
Source. I have a basic implementation ready but I want it to be able to:
 * easily scale up (should I have it extend RichParallelSourceFunction for 
this?)
 * Make it easier to provide the google cloud credentials. This would require 
being able to send some json string / ServiceAccount to the nodes when starting 
up this source.

Could this be something that would be useful for others and added to the flink 
connectors repo?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9302) Checkpoints continues to fail when using filesystem state backend with CIRCULAR REFERENCE:java.io.IOException

2018-05-07 Thread Narayanan Arunachalam (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466281#comment-16466281
 ] 

Narayanan Arunachalam commented on FLINK-9302:
--

Thanks [~srichter]. I ran some tests over the weekend and found that though the 
error is from S3, it was actually because of the size of the checkpoint. In one 
of my tests, after the job recovered from a last good checkpoint, the state 
size continued to grow at ~10G every 15 mins.

This was because, too much data was read in to the pipeline with 
`maxOutOfOrderness` set to 60 secs. The windows won't fire soon enough 
resulting in large states. One option is to scale the cluster to deal with many 
open windows. But I realize `AscendingTimestampExtractor` might be enough for 
my use case and running some tests using this setting.

In any case, this particular error is a side effect of the way my windows are 
setup and no direct evidence of any bug. Thought I will post this comment 
anyway to keep you all in sync.

 

> Checkpoints continues to fail when using filesystem state backend with 
> CIRCULAR REFERENCE:java.io.IOException
> -
>
> Key: FLINK-9302
> URL: https://issues.apache.org/jira/browse/FLINK-9302
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Narayanan Arunachalam
>Priority: Major
>
> *state backend: filesystem*
> *checkpoint.mode:EXACTLY_ONCE*
> +dag:+
> val streams = sEnv
> .addSource(makeKafkaSource(config))
> .map(makeEvent)
> .keyBy(_.get(EVENT_GROUP_ID))
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> .trigger(PurgingTrigger.of(EventTimeTrigger.create()))
> .apply(makeEventsList)
> .addSink(makeNoOpSink)
>  * The job runs fine and checkpoints succeed for few hours. 
>  * Later it fails because of the following checkpoint error.
>  * Once the job is recovered from the last successful checkpoint, it 
> continues to fail with the same checkpoint error.
>  * This persists until the job is restarted with no checkpoint state or using 
> the checkpoint previous to the last good one.
> AsynchronousException\{java.lang.Exception: Could not materialize checkpoint 
> 42 for operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: 
> NoOpSink, makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).}
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 42 for 
> operator makeSalpTrace -> countTraces -> (countLateEvents -> Sink: NoOpSink, 
> makeZipkinTrace -> (Map -> Sink: bs, Sink: es)) (110/120).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed 
> state future.
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
> ... 5 more
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> s3://traces-bucket/checkpoints/4f73/5e63-1525488620066/iep_dt_sp_nfflink_backend-main/f77f4b95292be85417ce81deeb35be4c/chk-42/5ff8dece-6678-4582-955c-f249daa0f3d0
>  in order to obtain the stream state handle
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
> at 
> 

[jira] [Created] (FLINK-9310) Update default cyphersuites

2018-05-07 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9310:
---

 Summary: Update default cyphersuites
 Key: FLINK-9310
 URL: https://issues.apache.org/jira/browse/FLINK-9310
 Project: Flink
  Issue Type: Task
  Components: Security
Affects Versions: 1.4.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen


The current default cipher suite {{TLS_RSA_WITH_AES_128_CBC_SHA}} is no longer 
recommended.

RFC 7525 [1] recommends to use the following cipher suites only:
* TLS_DHE_RSA_WITH_AES_128_GCM_SHA256
* TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
* TLS_DHE_RSA_WITH_AES_256_GCM_SHA384
* TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

[1] https://tools.ietf.org/html/rfc7525



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9265) Upgrade Prometheus version

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466236#comment-16466236
 ] 

ASF GitHub Bot commented on FLINK-9265:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5936
  
Yes, before giving +1 to this commit, we need to check that this introduces 
no new transitive dependency, or need to make sure that dependency is not an 
issue.


> Upgrade Prometheus version
> --
>
> Key: FLINK-9265
> URL: https://issues.apache.org/jira/browse/FLINK-9265
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> We're using 0.0.26
> Latest release is 2.2.1
> This issue is for upgrading the Prometheus version



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5936: [FLINK-9265] Upgrade Prometheus version

2018-05-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5936
  
Yes, before giving +1 to this commit, we need to check that this introduces 
no new transitive dependency, or need to make sure that dependency is not an 
issue.


---


[jira] [Commented] (FLINK-9281) LogBack not working

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466229#comment-16466229
 ] 

Stephan Ewen commented on FLINK-9281:
-

Do you have any insights into why it does not work with newer logback versions?
Is the slf4j version in Flink outdated?

> LogBack not working
> ---
>
> Key: FLINK-9281
> URL: https://issues.apache.org/jira/browse/FLINK-9281
> Project: Flink
>  Issue Type: Bug
>  Components: Logging
>Affects Versions: 1.4.2
>Reporter: Tim
>Priority: Major
>
> I am trying to get Flink to work with Logback instead of Log4J.   However, it 
> is not working. 
> My setup is as follows the advice on this page: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/best_practices.html#use-logback-when-running-flink-on-a-cluster
>  * Flink v1.4.2 running a stand-alone cluster. 
>  * Started JobManager as a foreground process (bin/jobmanager.sh 
> start-foreground cluster).  I updated bin/flink-console.sh to reference 
> logback.xml via -Dlogback.configurationFile=file:/path/to/logfile.
>  * Removed log4j jars under libs/  (log4j-1.2.xx.jar and 
> sfl4j-log4j12-xxx.jar)
>  * Added logback jars under libs/   (logback-classic, logback-core, 
> log4j-over-slf4j.jar) 
> However, I am not getting any file created.   Also, as a dumb test I 
> referenced a non-existent logback.xml file (changed path to a non-existent 
> folder) just to see if any errors appear on stdout, but nothing.
> Thanks
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9300) Improve error message when in-memory state is too large

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466227#comment-16466227
 ] 

Stephan Ewen commented on FLINK-9300:
-

This should actually not happen with the FsStateBackend. The message occurs 
when "in-line" state (i.e. state stored in the state handled, persisted with 
the metadata) grows too large.

The FsStateBackend writes state on checkpoints directly to the file system and 
communicate only the paths in the state handles.

This looks like there is a case where either the configuration is not properly 
propagated (and the MemoryStateBackend is used after all), or some code path 
uses the wrong type of state handle.

Can you share the stack trace of the original problem?

> Improve error message when in-memory state is too large
> ---
>
> Key: FLINK-9300
> URL: https://issues.apache.org/jira/browse/FLINK-9300
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> Currently in the {{MemCheckpointStreamFactory.checkSize()}} method, it can 
> throw an {{IOException}} via:
> {code:java}
> throw new IOException(
> "Size of the state is larger than the maximum permitted memory-backed state. 
> Size="
> + size + " , maxSize=" + maxSize
> + " . Consider using a different state backend, like the File System State 
> backend.");{code}
> But this will happen even if you’re using the File System State backend.
> This came up here: 
> [https://stackoverflow.com/questions/50149005/ioexception-size-of-the-state-is-larger-than-the-maximum-permitted-memory-backe]
> We could change the message to be:
> {quote}Please consider increasing the maximum permitted memory size, 
> increasing the task manager parallelism, or using a non-memory-based state 
> backend such as RocksDB.
> {quote}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9301) NotSoMiniClusterIterations job fails on travis

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466224#comment-16466224
 ] 

Stephan Ewen commented on FLINK-9301:
-

Okay, so apparently the required memory per TaskManager is now a bit higher 
than it was.
Can we fix that by simply reducing the number of TaskManagers?

The test was initially a "manual test", so not executed automatically as part 
of a build, and it was intended to observe and debug execution of a reasonably 
complex job locally, to assess if deployment and ramp up happens fast.

> NotSoMiniClusterIterations job fails on travis
> --
>
> Key: FLINK-9301
> URL: https://issues.apache.org/jira/browse/FLINK-9301
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> The high-parallelism-iterations-test fails on travis. After starting ~55 
> taskmanagers all memory is used and further memory allocations fail.
> I'm currently letting it run another time, if it fails again I will disable 
> the test temporarily.
> https://travis-ci.org/zentol/flink-ci/builds/375189790



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9305) Register flink-s3-fs-hadoop for the s3a:// scheme as well

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466222#comment-16466222
 ] 

Stephan Ewen commented on FLINK-9305:
-

I am not 100% sure about this issue.

Flink's shaded s3a currently does not work with the bucketing sink and will 
never work with Hadoop Input and Output formats, because it does not look like 
a Hadoop FS any more.
This would prevent using vanilla s3a in addition.

One could argue, though, that in that particular case, users should only use 
vanilla s3a in the first place.


> Register flink-s3-fs-hadoop for the s3a:// scheme as well
> -
>
> Key: FLINK-9305
> URL: https://issues.apache.org/jira/browse/FLINK-9305
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>
> For enhanced user experience, we should also register our Hadoop S3A-based 
> shaded S3 file system implementation for the {{s3a://}} file system scheme, 
> not just {{s3://}}. This way, the user can easily switch from the manual S3 
> integration to the shaded one.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9308) The method enableCheckpointing with low values like 10 are forming DoS on Kafka Clusters

2018-05-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466209#comment-16466209
 ] 

Stephan Ewen commented on FLINK-9308:
-

The rate at which Kafka can handle offset commits should not be really the 
biggest concern here.
Flink checkpoints do not need to commit to Kafka, this is optional.

> The method enableCheckpointing with low values like 10 are forming DoS on 
> Kafka Clusters
> 
>
> Key: FLINK-9308
> URL: https://issues.apache.org/jira/browse/FLINK-9308
> Project: Flink
>  Issue Type: Bug
>Reporter: Seweryn Habdank-Wojewodzki
>Assignee: vinoyang
>Priority: Major
>
> Hi,
> Docus about Checkpoints in Flink contains such an example:
> {code}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> // start a checkpoint every 1000 ms
> env.enableCheckpointing(1000);
> {code}
> Nice. There is one hack. The enableCheckpointing( parametr /* in [ms]*/); 
> when used with eg. 1 or 10 will kill Kafka Server by continuous commits of 
> offsets.
> Every creatiive developer, who would like to defend the SW from duplication 
> of messages in case of crash, will decrease this parameter to minimum. He 
> will protect his app, but on the Kafka Broker/Server side he will cause DoS.
> Can you have a look, to limit minimum value in case of Kafka Stream 
> Environment?
> I am not sure if 100ms as minimum is enough, but 1000 ms as minimum would be 
> nice.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466166#comment-16466166
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186480680
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
+   }
+
+   /**
+* @return the key as a byte array (null if no key has been set).
+*/
+   byte[] getKey();
+
+   /**
+* @return The message, as a byte array (null if the message was empty 
or deleted).
+*/
+   byte[] getMessage();
+
+   /**
+* @return The topic the message has originated from (for example the 
Kafka topic).
+*/
+   String getTopic();
+
+   /**
+* @return The partition the message has originated from (for example 
the Kafka partition).
+*/
+   int getPartition();
+
+   /**
+* @return the offset of the message in the original source (for 
example the Kafka offset).
+*/
+   long getOffset();
+
+   /**
+* @return the timestamp of the consumer record
--- End diff --

I've added some more comments


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread FredTing
Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186480680
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
+   }
+
+   /**
+* @return the key as a byte array (null if no key has been set).
+*/
+   byte[] getKey();
+
+   /**
+* @return The message, as a byte array (null if the message was empty 
or deleted).
+*/
+   byte[] getMessage();
+
+   /**
+* @return The topic the message has originated from (for example the 
Kafka topic).
+*/
+   String getTopic();
+
+   /**
+* @return The partition the message has originated from (for example 
the Kafka partition).
+*/
+   int getPartition();
+
+   /**
+* @return the offset of the message in the original source (for 
example the Kafka offset).
+*/
+   long getOffset();
+
+   /**
+* @return the timestamp of the consumer record
--- End diff --

I've added some more comments


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466157#comment-16466157
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479633
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

I added some more text to the javadoc explaining that implementing the 
other `deserialize` method has more benefits.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread FredTing
Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479633
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
--- End diff --

I added some more text to the javadoc explaining that implementing the 
other `deserialize` method has more benefits.


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466141#comment-16466141
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479026
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I agree that it's probably better to make separate `DeserializationSchema` 
classes, for each connector type. But for now I think this is a relative easy 
fix without breaking the Flink API for the custom deserializers. There is 
already some discussion about redesigning the connectors (see issue 5479) with 
a `common connector framework` in mind. I think that would be a good place to 
decide what to do with a shared `DeserializationSchema`.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread FredTing
Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186479026
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
 ---
@@ -42,14 +42,22 @@
 @Public
 public interface DeserializationSchema extends Serializable, 
ResultTypeQueryable {
 
+   /**
+* @deprecated Use {@link #deserialize(ConsumerRecordMetaInfo)} .
+*/
+   @Deprecated
+   T deserialize(byte[] message) throws IOException;
+
/**
 * Deserializes the byte message.
 *
-* @param message The message, as a byte array.
+* @param consumerRecordMetaInfossage The message, as a {@link 
ConsumerRecordMetaInfo}.
 *
 * @return The deserialized message as an object (null if the message 
cannot be deserialized).
 */
-   T deserialize(byte[] message) throws IOException;
+   default T deserialize(ConsumerRecordMetaInfo 
consumerRecordMetaInfossage) throws IOException {
--- End diff --

I agree that it's probably better to make separate `DeserializationSchema` 
classes, for each connector type. But for now I think this is a relative easy 
fix without breaking the Flink API for the custom deserializers. There is 
already some discussion about redesigning the connectors (see issue 5479) with 
a `common connector framework` in mind. I think that would be a good place to 
decide what to do with a shared `DeserializationSchema`.


---


[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466138#comment-16466138
 ] 

ASF GitHub Bot commented on FLINK-9253:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5923
  
Apparently, there's a problem with unknown input channels that surfaced 
with the newly-merged extra tests: they don't get exclusive buffers (naturally) 
but the floating buffers are calculated and would need to be updated when the 
channels' types are known.
I'll take a look in the next few days.


> Make buffer count per InputGate always #channels*buffersPerChannel + 
> ExclusiveBuffers
> -
>
> Key: FLINK-9253
> URL: https://issues.apache.org/jira/browse/FLINK-9253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> The credit-based flow control path assigns exclusive buffers only to remote 
> channels (which makes sense since local channels don't use any own buffers). 
> However, this is a bit intransparent with respect to how much data may be in 
> buffers since this depends on the actual schedule of the job and not the job 
> graph.
> By adapting the floating buffers to use a maximum of 
> {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, 
> we would be channel-type agnostic and keep the old behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5923: [FLINK-9253][network] make the maximum floating buffers c...

2018-05-07 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5923
  
Apparently, there's a problem with unknown input channels that surfaced 
with the newly-merged extra tests: they don't get exclusive buffers (naturally) 
but the floating buffers are calculated and would need to be updated when the 
channels' types are known.
I'll take a look in the next few days.


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466112#comment-16466112
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

GitHub user ctamisier opened a pull request:

https://github.com/apache/flink/pull/5964

[FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder

## What is the purpose of the change
This PR is an alternative to https://github.com/apache/flink/pull/5538.

## Brief change log
A discussion started on https://github.com/apache/flink/pull/5538 that i'm 
quoting here:
> What about using the Configuration that is provided in 
RichFunction.open(Configuration parameters) for the 
CassandraSinkBase.open(Configuration configuration) {...} implementation ?
> 
> I saw in old docs that Configuration can be used in the open(...) method 
but today (1.4+) it might not be a good practice anymore.
> 
> What about adding keyspace attribute in CassandraPojoSink and 
CassandraSinkBuilder (throwing exception when not using a 
CassandraPojoSinkBuilder for the moment).
> And create a new Configuration() with this keyspace in CassandraPojoSink.
> And finally do a cluster.connect(keyspace);
> 
> I've done this here if you can have a look.
> I've updated CassandraConnectorITCase with a new test.
> I would like to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch).
> 
> Can this be a candidate for a PR, I am new to flink so it might break the 
flink good practice principles...
> Let me know!

The reply from zentol (Chesnay) (**I will check this point**):

> We would have to pass the keyspace via the constructor as the 
Configuration approach doesn't work for streaming.
> 
> Generally speaking it isn't a problem to set the keyspace when creating 
the connection. But I would like to know what happens if a POJO comes along 
that explicitly sets the keyspace; is it ignored, respected or will it cause an 
exception?

## Verifying this change

This change added tests and can be verified as follows:
- `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in 
`CassandraConnectorITCase.java`
- Need to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch, **I will investigate**)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **don't know**
  - The runtime per-record code paths (performance sensitive): **don't 
know**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **JavaDocs**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ctamisier/flink pojo-keyspace

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5964.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5964


commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f
Author: Clément Tamisier 
Date:   2018-05-06T15:04:18Z

[FLINK-8655] add keyspace in cassandra sink builder




> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-07 Thread ctamisier
GitHub user ctamisier opened a pull request:

https://github.com/apache/flink/pull/5964

[FLINK-8655] [Cassandra Connector] add keyspace in cassandra sink builder

## What is the purpose of the change
This PR is an alternative to https://github.com/apache/flink/pull/5538.

## Brief change log
A discussion started on https://github.com/apache/flink/pull/5538 that i'm 
quoting here:
> What about using the Configuration that is provided in 
RichFunction.open(Configuration parameters) for the 
CassandraSinkBase.open(Configuration configuration) {...} implementation ?
> 
> I saw in old docs that Configuration can be used in the open(...) method 
but today (1.4+) it might not be a good practice anymore.
> 
> What about adding keyspace attribute in CassandraPojoSink and 
CassandraSinkBuilder (throwing exception when not using a 
CassandraPojoSinkBuilder for the moment).
> And create a new Configuration() with this keyspace in CassandraPojoSink.
> And finally do a cluster.connect(keyspace);
> 
> I've done this here if you can have a look.
> I've updated CassandraConnectorITCase with a new test.
> I would like to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch).
> 
> Can this be a candidate for a PR, I am new to flink so it might break the 
flink good practice principles...
> Let me know!

The reply from zentol (Chesnay) (**I will check this point**):

> We would have to pass the keyspace via the constructor as the 
Configuration approach doesn't work for streaming.
> 
> Generally speaking it isn't a problem to set the keyspace when creating 
the connection. But I would like to know what happens if a POJO comes along 
that explicitly sets the keyspace; is it ignored, respected or will it cause an 
exception?

## Verifying this change

This change added tests and can be verified as follows:
- `testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink()` in 
`CassandraConnectorITCase.java`
- Need to run CassandraPojoSinkExample.main() to cover the 
CassandraSink.addSink() mechanism, but it doesn't work for me (even on flink 
master branch, **I will investigate**)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **don't know**
  - The runtime per-record code paths (performance sensitive): **don't 
know**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? **JavaDocs**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ctamisier/flink pojo-keyspace

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5964.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5964


commit 1ed3518a5f80c5e6bbbf70ab0b3c3d20b60f2e2f
Author: Clément Tamisier 
Date:   2018-05-06T15:04:18Z

[FLINK-8655] add keyspace in cassandra sink builder




---


[jira] [Commented] (FLINK-6373) Add runtime support for distinct aggregation over grouped windows

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466068#comment-16466068
 ] 

ASF GitHub Bot commented on FLINK-6373:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3765
  
The features that this PR was going to implement has been resolved by PR 
#. 
I will close it.


> Add runtime support for distinct aggregation over grouped windows
> -
>
> Key: FLINK-6373
> URL: https://issues.apache.org/jira/browse/FLINK-6373
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> This is a follow up task for FLINK-6335. FLINK-6335 enables parsing the 
> distinct aggregations over grouped windows. This jira tracks the effort of 
> adding runtime support for the query.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3765: [FLINK-6373] Add runtime support for distinct aggregation...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3765
  
The features that this PR was going to implement has been resolved by PR 
#. 
I will close it.


---


[GitHub] flink issue #3764: [FLINK-6335] Parse DISTINCT over grouped windows in strea...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3764
  
This PR has been integrated into #5940. 
I'll close it.


---


[jira] [Commented] (FLINK-6335) Parse DISTINCT over grouped window in stream SQL

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466067#comment-16466067
 ] 

ASF GitHub Bot commented on FLINK-6335:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3764
  
This PR has been integrated into #5940. 
I'll close it.


> Parse DISTINCT over grouped window in stream SQL
> 
>
> Key: FLINK-6335
> URL: https://issues.apache.org/jira/browse/FLINK-6335
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Haohui Mai
>Assignee: Haohui Mai
>Priority: Major
>
> The SQL on the batch side supports the {{DISTINCT}} keyword over aggregation. 
> This jira proposes to support the {{DISTINCT}} keyword on streaming 
> aggregation using the same technique on the batch side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466066#comment-16466066
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
merging


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466065#comment-16466065
 ] 

ASF GitHub Bot commented on FLINK-8237:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
merging


> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
merging


---


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
merging


---


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466062#comment-16466062
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186465223
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

I'll rename them both. 


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread FredTing
Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186465223
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/serialization/ConsumerRecordMetaInfo.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * The consumer record meta info contains, besides the actual message, 
some meta information, such as
+ * key, topic, partition, offset and timestamp for Apache kafka
+ *
+ * Note:The timestamp is only valid for Kafka clients 0.10+, for 
older versions the value has the value `Long.MinValue` and
+ * the timestampType has the value `NO_TIMESTAMP_TYPE`.
+ */
+@Public
+public interface ConsumerRecordMetaInfo {
+   /**
+* The TimestampType is introduced in the kafka clients 0.10+. This 
interface is also used for the Kafka connector 0.9
+* so a local enumeration is needed.
+*/
+   enum TimestampType {
+   NO_TIMESTAMP_TYPE, CREATE_TIME, INGEST_TIME
--- End diff --

I'll rename them both. 


---


[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466051#comment-16466051
 ] 

ASF GitHub Bot commented on FLINK-8237:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the update @pavel-shvetsov-git.
+1 to merge


> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the update @pavel-shvetsov-git.
+1 to merge


---


[jira] [Commented] (FLINK-8690) Support distinct aggregation on group windowed streaming tables.

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466048#comment-16466048
 ] 

ASF GitHub Bot commented on FLINK-8690:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks for the update @walterddr.
The PR is good to merge.


> Support distinct aggregation on group windowed streaming tables.
> 
>
> Key: FLINK-8690
> URL: https://issues.apache.org/jira/browse/FLINK-8690
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently, *FlinkLogicalAggregate / FlinkLogicalWindowAggregate* does not 
> allow distinct aggregate.
> We are proposing to reuse distinct aggregate codegen work designed for 
> *FlinkLogicalOverAggregate*, to support unbounded distinct aggregation on 
> datastream as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466047#comment-16466047
 ] 

ASF GitHub Bot commented on FLINK-8500:
---

Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186462871
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -78,6 +79,69 @@ public Kafka010Fetcher(
useMetrics);
}
 
+   private class KafkaConsumerRecordWrapper10 implements 
ConsumerRecordMetaInfo {
+   private static final long serialVersionUID = 
2651665280744549935L;
+
+   private final ConsumerRecord consumerRecord;
+
+   public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) {
+   this.consumerRecord = consumerRecord;
+   }
+
+   @Override
+   public byte[] getKey() {
+   return consumerRecord.key();
+   }
+
+   @Override
+   public byte[] getMessage() {
+   return consumerRecord.value();
+   }
+
+   @Override
+   public String getTopic() {
+   return consumerRecord.topic();
+   }
+
+   @Override
+   public int getPartition() {
+   return consumerRecord.partition();
+   }
+
+   @Override
+   public long getOffset() {
+   return consumerRecord.offset();
+   }
+
+   @Override
+   public long getTimestamp() {
+   return Long.MIN_VALUE;
--- End diff --

Yes, it certainly does, it should return the `consumerRecord.timestamp()`. 
I'll fix it.


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks for the update @walterddr.
The PR is good to merge.


---


[GitHub] flink pull request #5958: [FLINK-8500] Get the timestamp of the Kafka messag...

2018-05-07 Thread FredTing
Github user FredTing commented on a diff in the pull request:

https://github.com/apache/flink/pull/5958#discussion_r186462871
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
 ---
@@ -78,6 +79,69 @@ public Kafka010Fetcher(
useMetrics);
}
 
+   private class KafkaConsumerRecordWrapper10 implements 
ConsumerRecordMetaInfo {
+   private static final long serialVersionUID = 
2651665280744549935L;
+
+   private final ConsumerRecord consumerRecord;
+
+   public KafkaConsumerRecordWrapper10(ConsumerRecord consumerRecord) {
+   this.consumerRecord = consumerRecord;
+   }
+
+   @Override
+   public byte[] getKey() {
+   return consumerRecord.key();
+   }
+
+   @Override
+   public byte[] getMessage() {
+   return consumerRecord.value();
+   }
+
+   @Override
+   public String getTopic() {
+   return consumerRecord.topic();
+   }
+
+   @Override
+   public int getPartition() {
+   return consumerRecord.partition();
+   }
+
+   @Override
+   public long getOffset() {
+   return consumerRecord.offset();
+   }
+
+   @Override
+   public long getTimestamp() {
+   return Long.MIN_VALUE;
--- End diff --

Yes, it certainly does, it should return the `consumerRecord.timestamp()`. 
I'll fix it.


---


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-07 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466037#comment-16466037
 ] 

Ted Yu commented on FLINK-9303:
---

{code}
+   if (!removedPartitions.isEmpty()) {
+   log.info("Removing " + removedPartitions.size() 
+ " partition(s) from consumer.");
+   
partitionsToBeRemoved.removeAll(removedPartitions);
{code}
{{removeAll}} can be saved by using iterator removal for partitionsToBeRemoved 
in the preceding loop.

> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9064) Add Scaladocs link to documentation

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466020#comment-16466020
 ] 

ASF GitHub Bot commented on FLINK-9064:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5773
  
cc @zentol @GJL @tzulitai please review thanks~


> Add Scaladocs link to documentation
> ---
>
> Key: FLINK-9064
> URL: https://issues.apache.org/jira/browse/FLINK-9064
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Matt Hagen
>Assignee: vinoyang
>Priority: Minor
>   Original Estimate: 5m
>  Remaining Estimate: 5m
>
> Browse to the [Apache Flink 
> Documentation|https://ci.apache.org/projects/flink/flink-docs-master/] page.
> On the sidebar, under the Javadocs link, I recommend that you add a 
> [Scaladocs|https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/scala/index.html#org.apache.flink.api.scala.package]
>  link.
> Thanks!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5773: [FLINK-9064] Add Scaladocs link to documentation

2018-05-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5773
  
cc @zentol @GJL @tzulitai please review thanks~


---


[jira] [Commented] (FLINK-7001) Improve performance of Sliding Time Window with pane optimization

2018-05-07 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466014#comment-16466014
 ] 

Rong Rong commented on FLINK-7001:
--

Thanks [~pgrulich], This is definitely a great solution when handling high 
frequency, long length sliding windows.

I briefly went over the paper and got a few questions regarding the use cases 
and compatibility. 

* The non-overlapping slide separator + slide manager approach is very elegant 
in order to save memory buffer usage and having a sole slice manager to handle 
out-of-order messages by updating the slices in store is definitely great. My 
concern is with [~StephanEwen] on this especially the backward & RocksDB 
compatibility. 

* Another point is the partial aggregates vs final aggregates complexity. 
There's little discussed in the paper regarding the "Window manager" and seems 
like the assumption is the final aggregate over the partial results will have 
the same amount of time/space complexity comparing with the partial aggregates. 
Most of the built-in aggregate functions we currently have in Flink are pretty 
much satisfied with this assumption, however, there are some complex aggregate 
functions of which the "merge" method might be much more complex than the 
"accumulate" methods. Would we have to consider the trade off between these two 
approaches? 

* https://issues.apache.org/jira/browse/FLINK-5387 seems to suggest there are 
trade-offs when using aligning window approaches. We can probably extend the 
discussions here. 

Thanks,
Rong

> Improve performance of Sliding Time Window with pane optimization
> -
>
> Key: FLINK-7001
> URL: https://issues.apache.org/jira/browse/FLINK-7001
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> Currently, the implementation of time-based sliding windows treats each 
> window individually and replicates records to each window. For a window of 10 
> minute size that slides by 1 second the data is replicated 600 fold (10 
> minutes / 1 second). We can optimize sliding window by divide windows into 
> panes (aligned with slide), so that we can avoid record duplication and 
> leverage the checkpoint.
> I will attach a more detail design doc to the issue.
> The following issues are similar to this issue: FLINK-5387, FLINK-6990



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466011#comment-16466011
 ] 

ASF GitHub Bot commented on FLINK-7897:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5777
  
cc @zentol @GJL @tzulitai this PR takes a long time, please review it 
thanks.


> Consider using nio.Files for file deletion in TransientBlobCleanupTask
> --
>
> Key: FLINK-7897
> URL: https://issues.apache.org/jira/browse/FLINK-7897
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> nio.Files#delete() provides better clue as to why the deletion may fail:
> https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)
> Depending on the potential exception (FileNotFound), the call to 
> localFile.exists() may be skipped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5777: [FLINK-7897] Consider using nio.Files for file deletion i...

2018-05-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5777
  
cc @zentol @GJL @tzulitai this PR takes a long time, please review it 
thanks.


---


[jira] [Assigned] (FLINK-8999) Ensure the job has an operator with operator state.

2018-05-07 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-8999:
---

Assignee: (was: mingleizhang)

> Ensure the job has an operator with operator state.
> ---
>
> Key: FLINK-8999
> URL: https://issues.apache.org/jira/browse/FLINK-8999
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7917) The return of taskInformationOrBlobKey should be placed inside synchronized in ExecutionJobVertex

2018-05-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466010#comment-16466010
 ] 

ASF GitHub Bot commented on FLINK-7917:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5798
  
cc @zentol @GJL @tzulitai this PR takes a long time, please review thanks~


> The return of taskInformationOrBlobKey should be placed inside synchronized 
> in ExecutionJobVertex
> -
>
> Key: FLINK-7917
> URL: https://issues.apache.org/jira/browse/FLINK-7917
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> Currently in ExecutionJobVertex#getTaskInformationOrBlobKey:
> {code}
> }
> return taskInformationOrBlobKey;
> {code}
> The return should be placed inside synchronized block.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5798: [FLINK-7917] The return of taskInformationOrBlobKey shoul...

2018-05-07 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5798
  
cc @zentol @GJL @tzulitai this PR takes a long time, please review thanks~


---


  1   2   3   >