[jira] [Commented] (FLINK-9456) Let ResourceManager notify JobManager about failed/killed TaskManagers

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504268#comment-16504268
 ] 

ASF GitHub Bot commented on FLINK-9456:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6132

[FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager 
about failed/killed TaskManagers.

## What is the purpose of the change

*Often, the ResourceManager learns faster about TaskManager 
failures/killings because it directly communicates with the underlying resource 
management framework. Instead of only relying on the JobManager's heartbeat to 
figure out that a TaskManager has died, we should additionally send a signal 
from the ResourceManager to the JobManager if a TaskManager has died. That way, 
we can react faster to TaskManager failures and recover our running job/s.*

## Brief change log

  - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task 
manager terminated and do the disconnection there.*
  - *Let the `ResourceManager` to notify JobMaster when the task manager 
terminated*

## Verifying this change

- once this approach is verified in general, I will add tests for it.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6132


commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0
Author: sihuazhou 
Date:   2018-05-10T06:36:27Z

Let ResourceManager notify JobManager about failed/killed TaskManagers.




> Let ResourceManager notify JobManager about failed/killed TaskManagers
> --
>
> Key: FLINK-9456
> URL: https://issues.apache.org/jira/browse/FLINK-9456
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> Often, the {{ResourceManager}} learns faster about TaskManager 
> failures/killings because it directly communicates with the underlying 
> resource management framework. Instead of only relying on the 
> {{JobManager}}'s heartbeat to figure out that a {{TaskManager}} has died, we 
> should additionally send a signal from the {{ResourceManager}} to the 
> {{JobManager}} if a {{TaskManager}} has died. That way, we can react faster 
> to {{TaskManager}} failures and recover our running job/s.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6132: [FLINK-9456][Distributed Coordination]Let Resource...

2018-06-06 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6132

[FLINK-9456][Distributed Coordination]Let ResourceManager notify JobManager 
about failed/killed TaskManagers.

## What is the purpose of the change

*Often, the ResourceManager learns faster about TaskManager 
failures/killings because it directly communicates with the underlying resource 
management framework. Instead of only relying on the JobManager's heartbeat to 
figure out that a TaskManager has died, we should additionally send a signal 
from the ResourceManager to the JobManager if a TaskManager has died. That way, 
we can react faster to TaskManager failures and recover our running job/s.*

## Brief change log

  - *Add `JobMasterGateway#taskManagerTerminated()` to notify the task 
manager terminated and do the disconnection there.*
  - *Let the `ResourceManager` to notify JobMaster when the task manager 
terminated*

## Verifying this change

- once this approach is verified in general, I will add tests for it.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

No

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9456

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6132


commit 652ac037ef3edc75cea0abd4966c2154d6e5fbc0
Author: sihuazhou 
Date:   2018-05-10T06:36:27Z

Let ResourceManager notify JobManager about failed/killed TaskManagers.




---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504249#comment-16504249
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
@fhueske Great, let me take a look and commit another version later. 


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
@fhueske Great, let me take a look and commit another version later. 


---


[GitHub] flink pull request #6076: [hotfix][docs] Specify operators behaviour on proc...

2018-06-06 Thread eliaslevy
Github user eliaslevy commented on a diff in the pull request:

https://github.com/apache/flink/pull/6076#discussion_r193619225
  
--- Diff: docs/dev/event_time.md ---
@@ -213,10 +213,33 @@ arrive after the system's event time clock (as 
signaled by the watermarks) has a
 timestamp. See [Allowed Lateness]({{ site.baseurl 
}}/dev/stream/operators/windows.html#allowed-lateness) for more information on 
how to work
 with late elements in event time windows.
 
+## Idling sources
--- End diff --

Good point.  Probably a good idea to include the here the description in 
[StreamStatus](https://ci.apache.org/projects/flink/flink-docs-release-1.5/api/java/org/apache/flink/streaming/runtime/streamstatus/StreamStatus.html).


---


[GitHub] flink issue #6129: [FLINK-9503] Migrate integration tests for iterative aggr...

2018-06-06 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6129
  
cc @zentol please review again, the reason of CI error is other problem


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504199#comment-16504199
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6129
  
cc @zentol please review again, the reason of CI error is other problem


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6131: [hotfix][docs] Fix Table API scala example code

2018-06-06 Thread zjffdu
GitHub user zjffdu opened a pull request:

https://github.com/apache/flink/pull/6131

[hotfix][docs] Fix Table API scala example code

This change only fix the documentation. This is a straightforward fix for 
the Table API of scala example code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjffdu/flink doc_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6131.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6131


commit c0e2b7818094f4c1cc79981ad6dd3cc7dffe2bdc
Author: Jeff Zhang 
Date:   2018-06-07T00:45:43Z

[hotfix][docs] Fix Table API scala example code




---


[jira] [Commented] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504073#comment-16504073
 ] 

ASF GitHub Bot commented on FLINK-9545:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/6130

[FLINK-9545] Support read a file multiple times in Flink DataStream

## What is the purpose of the change

we need `StreamExecutionEnvironment.readFile/readTextFile` to read each 
file for N times, but currently it only supports reading file once.

add support for the feature.

## Brief change log

- add a new processing mode as PROCESSING_N_TIMES
- add additional parameter numTimes for 
StreamExecutionEnvironment.readFile/readTextFile

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6130


commit d51fd25ca0ff8e38aaf84d2076c9c979cd136c9d
Author: Bowen Li 
Date:   2018-06-07T00:12:59Z

[FLINK-9545] Support read a file multiple times in Flink DataStream




> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6130: [FLINK-9545] Support read a file multiple times in...

2018-06-06 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/6130

[FLINK-9545] Support read a file multiple times in Flink DataStream

## What is the purpose of the change

we need `StreamExecutionEnvironment.readFile/readTextFile` to read each 
file for N times, but currently it only supports reading file once.

add support for the feature.

## Brief change log

- add a new processing mode as PROCESSING_N_TIMES
- add additional parameter numTimes for 
StreamExecutionEnvironment.readFile/readTextFile

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (docs / JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-9545

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6130


commit d51fd25ca0ff8e38aaf84d2076c9c979cd136c9d
Author: Bowen Li 
Date:   2018-06-07T00:12:59Z

[FLINK-9545] Support read a file multiple times in Flink DataStream




---


[jira] [Updated] (FLINK-9545) Support read a file multiple times in Flink DataStream

2018-06-06 Thread Bowen Li (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-9545:

Summary: Support read a file multiple times in Flink DataStream   (was: 
Support read file for N times in Flink stream)

> Support read a file multiple times in Flink DataStream 
> ---
>
> Key: FLINK-9545
> URL: https://issues.apache.org/jira/browse/FLINK-9545
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.6.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0
>
>
> we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each 
> file for N times, but currently it only supports reading file once.
> add support for the feature.
> Plan:
> add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
> {{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9262) KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot

2018-06-06 Thread Chris Schneider (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504052#comment-16504052
 ] 

Chris Schneider commented on FLINK-9262:


Hi Aljoscha,

I'm still able to reproduce this problem myself using Flink 1.4-SNAPSHOT (at 
90acd78), but I'm not certain that the method I chose for doing so was correct:
 # I pulled the 1.4 branch to my local flink source directory (at 90acd78).
 # I deleted the entire flink directory from my local Maven repo.
 # I ran {{mvn install -DskipTests}} from my flink source directory (and 
checked the output to make sure it wasn't downloading any Flink artifacts with 
version 1.4-SNAPSHOT).
 # I modified the pom.xml in my own source project to depend on the 
1.4-SNAPSHOT version of all Flink artifacts (and to avoid using 
apache.snapshots repository).
 # I ran {{FlinkIssueTest}} within my own source project.

Unless you see an error in the approach I've described above, I'm inclined to 
re-open this issue.

Please let me know what you think,

Chris

> KeyedOneInputStreamOperatorTestHarness throws NPE creating snapshot
> ---
>
> Key: FLINK-9262
> URL: https://issues.apache.org/jira/browse/FLINK-9262
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming, Tests
>Affects Versions: 1.4.0
> Environment: macOS X High Sierra 10.13.4
> (ancient) Eclipse Luna v.4.4.1 
> JRE System Library [Java SE 8 [1.8.0_131]]
> Java 8 Update 171 build 11
>Reporter: Chris Schneider
>Priority: Blocker
>
> Although KeyedOneInputStreamOperatorTestHarness and other 
> AbstractStreamOperatorTestHarness subclasses are not yet part of the public 
> Flink API, we have been trying to make use of them for unit testing our map 
> functions. The following code throws NPE from the attempt to collect a 
> snapshot on Flink 1.4.0 (even after applying [the 
> fix|https://github.com/apache/flink/pull/5193/commits/ba676d7de5536e32e0c48c3db511bec1758f4e80]
>  for FLINK-8268), but appears to work properly on Flink 1.5-SNAPSHOT:
> {code:java}
> package com.scaleunlimited.flinkcrawler.functions;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.java.functions.KeySelector;
> import org.apache.flink.streaming.api.operators.StreamFlatMap;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.util.Collector;
> import org.junit.Test;
> public class FlinkIssueTest {
>     
>     @SuppressWarnings("serial")
>     private static class MyProcessFunction extends 
> RichFlatMapFunction {
>     @Override
>     public void flatMap(String input, Collector collector) throws 
> Exception {
>     collector.collect(input);
>     }
>     }
>     
>     @SuppressWarnings({
>     "serial", "hiding"
>     })
>     private static class MyKeySelector implements KeySelector String> {
>     @Override
>     public String getKey(String input) throws Exception {
>     return input;
>     }
>     }
>     @Test
>     public void test() throws Throwable {
>     KeyedOneInputStreamOperatorTestHarness 
> testHarness =
>     new KeyedOneInputStreamOperatorTestHarness String>(
>     new StreamFlatMap<>(new MyProcessFunction()),
>     new MyKeySelector(),
>     BasicTypeInfo.STRING_TYPE_INFO,
>     1,
>     1,
>     0);
>     testHarness.setup();
>     testHarness.open();
>     
>     for (int i = 0; i < 10; i++) {
>     String urlString = String.format("https://domain-%d.com/page1;, 
> i);
>     testHarness.processElement(new StreamRecord<>(urlString));
>     }
>     testHarness.snapshot(0L, 0L);
>     }
> }
> {code}
> Output:
> {noformat}
> java.lang.Exception: Could not complete snapshot 0 for operator MockTask 
> (1/1).
>     at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:379)
>     at 
> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.snapshot(AbstractStreamOperatorTestHarness.java:459)
>     at 
> com.scaleunlimited.flinkcrawler.functions.FlinkIssueTest.test(FlinkIssueTest.java:51)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>     at 
> 

[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
I think we have to return an typed array here. A `List` won't be supported 
by the built-in SQL functions. 

There are a few tricks on can play to create typed arrays, even in static 
code like

```
Object[] array = (Object[]) Array.newInstance(clazz, length);
```

Have a look at the code of the ORC InputFormat that had to solve a similar 
challenge: 
[OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java).


---


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504037#comment-16504037
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
I think we have to return an typed array here. A `List` won't be supported 
by the built-in SQL functions. 

There are a few tricks on can play to create typed arrays, even in static 
code like

```
Object[] array = (Object[]) Array.newInstance(clazz, length);
```

Have a look at the code of the ORC InputFormat that had to solve a similar 
challenge: 
[OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java).


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9545) Support read file for N times in Flink stream

2018-06-06 Thread Bowen Li (JIRA)
Bowen Li created FLINK-9545:
---

 Summary: Support read file for N times in Flink stream
 Key: FLINK-9545
 URL: https://issues.apache.org/jira/browse/FLINK-9545
 Project: Flink
  Issue Type: Improvement
  Components: DataStream API
Affects Versions: 1.6.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.6.0


we need {{StreamExecutionEnvironment.readFile/readTextFile}} to read each file 
for N times, but currently it only supports reading file once.

add support for the feature.

Plan:

add a new processing mode as PROCESSING_N_TIMES, and add additional parameter 
{{numTimes}} for {{StreamExecutionEnvironment.readFile/readTextFile}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8983) End-to-end test: Confluent schema registry

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504023#comment-16504023
 ] 

ASF GitHub Bot commented on FLINK-8983:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann Thanks a lot for doing the review!


> End-to-end test: Confluent schema registry
> --
>
> Key: FLINK-8983
> URL: https://issues.apache.org/jira/browse/FLINK-8983
> Project: Flink
>  Issue Type: Sub-task
>  Components: Kafka Connector, Tests
>Reporter: Till Rohrmann
>Assignee: Yazdan Shirvany
>Priority: Critical
>
> It would be good to add an end-to-end test which verifies that Flink is able 
> to work together with the Confluent schema registry. In order to do that we 
> have to setup a Kafka cluster and write a Flink job which reads from the 
> Confluent schema registry producing an Avro type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6083: [FLINK-8983] End-to-end test: Confluent schema registry

2018-06-06 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6083
  
@tillrohrmann Thanks a lot for doing the review!


---


[jira] [Commented] (FLINK-8654) Extend quickstart docs on how to submit jobs

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504021#comment-16504021
 ] 

ASF GitHub Bot commented on FLINK-8654:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6084
  
@zentol yes, that also works. Do you still need me to change it and extend 
`Next Steps` or we can keep this to provide a bit more info for different ways 
to submitting jobs.


> Extend quickstart docs on how to submit jobs
> 
>
> Key: FLINK-8654
> URL: https://issues.apache.org/jira/browse/FLINK-8654
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Quickstarts
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Yazdan Shirvany
>Priority: Major
>
> The quickstart documentation explains how to setup the project, build the jar 
> and run things in the IDE, but neither explains how to submit the jar to a 
> cluster nor guides the user to where he could find this information (like the 
> CLI docs).
> Additionally, the quickstart poms should also contain the commands for 
> submitting the jar to a cluster, in particular how to select a main-class if 
> it wasn't set in the pom. (-c CLI flag)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6084: [FLINK-8654][Docs] Extend quickstart docs on how to submi...

2018-06-06 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6084
  
@zentol yes, that also works. Do you still need me to change it and extend 
`Next Steps` or we can keep this to provide a bit more info for different ways 
to submitting jobs.


---


[GitHub] flink issue #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread medcv
Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol Thanks for the review. I made the clean up and did some changes to 
get the ES dependency from flink-quickstart-test/pom.xml 


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504017#comment-16504017
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on the issue:

https://github.com/apache/flink/pull/6089
  
@zentol Thanks for the review. I made the clean up and did some changes to 
get the ES dependency from flink-quickstart-test/pom.xml 


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504013#comment-16504013
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581841
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -51,12 +59,12 @@ sed -i -e ''"$(($position + 1))"'i\
 ${flink.version}\
--- End diff --

I did some code change to get the ES dependency from 
flink-quickstart-test/pom.xml and removed the hardcoded values


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504014#comment-16504014
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581372
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
--- End diff --

Done!


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504012#comment-16504012
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581567
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
--- End diff --

Done!


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581372
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
--- End diff --

Done!


---


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581841
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -51,12 +59,12 @@ sed -i -e ''"$(($position + 1))"'i\
 ${flink.version}\
--- End diff --

I did some code change to get the ES dependency from 
flink-quickstart-test/pom.xml and removed the hardcoded values


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504015#comment-16504015
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581591
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch5_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
--- End diff --

Removed!


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581567
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
--- End diff --

Done!


---


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193581591
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch5_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
--- End diff --

Removed!


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503739#comment-16503739
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193519270
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -199,12 +201,17 @@ public void 
testConvergenceCriterionWithParameterForIterate() throws Exception {
new 
NegativeElementsConvergenceCriterionWithParam(3));
 
DataSet updatedDs = iteration.map(new 
SubtractOneMap());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
+
+   List expected = new ArrayList() {{
+   add(-3); add(-2); add(-2); add(-1); add(-1);
+   add(-1); add(0); add(0); add(0); add(0);
+   add(1); add(1); add(1); add(1); add(1);
+   }};
+   Collections.sort(expected);
--- End diff --

let's remove this so the port remains straight-forward. I don't wanna worry 
about these sorts changing the semantics of the test.


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193519270
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -199,12 +201,17 @@ public void 
testConvergenceCriterionWithParameterForIterate() throws Exception {
new 
NegativeElementsConvergenceCriterionWithParam(3));
 
DataSet updatedDs = iteration.map(new 
SubtractOneMap());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
+
+   List expected = new ArrayList() {{
+   add(-3); add(-2); add(-2); add(-1); add(-1);
+   add(-1); add(0); add(0); add(0); add(0);
+   add(1); add(1); add(1); add(1); add(1);
+   }};
+   Collections.sort(expected);
--- End diff --

let's remove this so the port remains straight-forward. I don't wanna worry 
about these sorts changing the semantics of the test.


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503735#comment-16503735
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193518671
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -141,12 +133,17 @@ public void 
testAggregatorWithoutParameterForIterate() throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMap());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
+
+   List expected = new ArrayList() {{
+   add(-3); add(-2); add(-2); add(-1); add(-1);
--- End diff --

use `Arrays.asList` instead


> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6129#discussion_r193518671
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -141,12 +133,17 @@ public void 
testAggregatorWithoutParameterForIterate() throws Exception {
new NegativeElementsConvergenceCriterion());
 
DataSet updatedDs = iteration.map(new 
SubtractOneMap());
-   iteration.closeWith(updatedDs).writeAsText(resultPath);
-   env.execute();
+   List result = iteration.closeWith(updatedDs).collect();
+   Collections.sort(result);
+
+   List expected = new ArrayList() {{
+   add(-3); add(-2); add(-2); add(-1); add(-1);
--- End diff --

use `Arrays.asList` instead


---


[jira] [Commented] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-06-06 Thread Gary Yao (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503691#comment-16503691
 ] 

Gary Yao commented on FLINK-5486:
-

I don't see why this is needed. Flink callbacks are generally not invoked 
concurrently.

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.3.4
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5486) Lack of synchronization in BucketingSink#handleRestoredBucketState()

2018-06-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351986#comment-16351986
 ] 

Ted Yu edited comment on FLINK-5486 at 6/6/18 5:51 PM:
---

Can this get more review, please?


was (Author: yuzhih...@gmail.com):
Can this get more review, please ?

> Lack of synchronization in BucketingSink#handleRestoredBucketState()
> 
>
> Key: FLINK-5486
> URL: https://issues.apache.org/jira/browse/FLINK-5486
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.3.4
>
>
> Here is related code:
> {code}
>   
> handlePendingFilesForPreviousCheckpoints(bucketState.pendingFilesPerCheckpoint);
>   synchronized (bucketState.pendingFilesPerCheckpoint) {
> bucketState.pendingFilesPerCheckpoint.clear();
>   }
> {code}
> The handlePendingFilesForPreviousCheckpoints() call should be enclosed inside 
> the synchronization block. Otherwise during the processing of 
> handlePendingFilesForPreviousCheckpoints(), some entries of the map may be 
> cleared.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-06-06 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955
 ] 

Ted Yu edited comment on FLINK-7795 at 6/6/18 5:50 PM:
---

error-prone has JDK 8 dependency.


was (Author: yuzhih...@gmail.com):
error-prone has JDK 8 dependency .

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503613#comment-16503613
 ] 

Aljoscha Krettek commented on FLINK-9540:
-

{{flink-s3-fs-hadoop-*.jar}} and {{flink-s3-fs-presto-*.jar}} are completely 
self-contained jars and in the case of the hadoop version come with a fixed 
hadoop version. If you want to use these you should not follow the instructions 
under 
[https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html#hadoop-provided-s3-file-systems---manual-setup].

Did you encounter any error message with this?

> Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> -
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
> #
>  # Licensed to the Apache Software Foundation (ASF) under one
>  # or more contributor license agreements. See the NOTICE file
>  # distributed with this work for additional information
>  # regarding copyright ownership. The ASF licenses this file
>  # to you under the Apache License, Version 2.0 (the
>  # "License"); you may not use this file except in compliance
>  # with the License. You may obtain a copy of the License at
>  #
>  # [http://www.apache.org/licenses/LICENSE-2.0]
>  #
>  # Unless required by applicable law or agreed to in writing, software
>  # distributed under the License is distributed on an "AS IS" BASIS,
>  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  # See the License for the specific language governing permissions and
>  # limitations under the License.
>  #
> *version=2.8.1*
>  revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
>  *branch=branch-2.8.1-private*
>  user=vinodkv
>  date=2017-06-07T21:22Z
>  *url=[https://git-wip-us.apache.org/repos/asf/hadoop.git*]
>  srcChecksum=60125541c2b3e266cbf3becc5bda666
>  protocVersion=2.5.0
>  
> This will fail to work with dependencies described in 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]
>  
> "Depending on which file system you use, please add the following 
> dependencies. You can find these as part of the Hadoop binaries in 
> {{hadoop-2.7/share/hadoop/tools/lib}}:
>  * {{S3AFileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
>  *** {{aws-java-sdk-core-1.11.183.jar}}
>  *** {{aws-java-sdk-kms-1.11.183.jar}}
>  *** {{jackson-annotations-2.6.7.jar}}
>  *** {{jackson-core-2.6.7.jar}}
>  *** {{jackson-databind-2.6.7.jar}}
>  *** {{joda-time-2.8.1.jar}}
>  *** {{httpcore-4.4.4.jar}}
>  *** {{httpclient-4.5.3.jar}}
>  * {{NativeS3FileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{guava-11.0.2.jar"}}{{}}
>  
> {{  I presume the build task is flawed, it should be built for Apache Hadoop 
> 2.7 can someone please check it?}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-9540.
---
Resolution: Not A Problem

Please reopen if you disagree.

> Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> -
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
> #
>  # Licensed to the Apache Software Foundation (ASF) under one
>  # or more contributor license agreements. See the NOTICE file
>  # distributed with this work for additional information
>  # regarding copyright ownership. The ASF licenses this file
>  # to you under the Apache License, Version 2.0 (the
>  # "License"); you may not use this file except in compliance
>  # with the License. You may obtain a copy of the License at
>  #
>  # [http://www.apache.org/licenses/LICENSE-2.0]
>  #
>  # Unless required by applicable law or agreed to in writing, software
>  # distributed under the License is distributed on an "AS IS" BASIS,
>  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  # See the License for the specific language governing permissions and
>  # limitations under the License.
>  #
> *version=2.8.1*
>  revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
>  *branch=branch-2.8.1-private*
>  user=vinodkv
>  date=2017-06-07T21:22Z
>  *url=[https://git-wip-us.apache.org/repos/asf/hadoop.git*]
>  srcChecksum=60125541c2b3e266cbf3becc5bda666
>  protocVersion=2.5.0
>  
> This will fail to work with dependencies described in 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]
>  
> "Depending on which file system you use, please add the following 
> dependencies. You can find these as part of the Hadoop binaries in 
> {{hadoop-2.7/share/hadoop/tools/lib}}:
>  * {{S3AFileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
>  *** {{aws-java-sdk-core-1.11.183.jar}}
>  *** {{aws-java-sdk-kms-1.11.183.jar}}
>  *** {{jackson-annotations-2.6.7.jar}}
>  *** {{jackson-core-2.6.7.jar}}
>  *** {{jackson-databind-2.6.7.jar}}
>  *** {{joda-time-2.8.1.jar}}
>  *** {{httpcore-4.4.4.jar}}
>  *** {{httpclient-4.5.3.jar}}
>  * {{NativeS3FileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{guava-11.0.2.jar"}}{{}}
>  
> {{  I presume the build task is flawed, it should be built for Apache Hadoop 
> 2.7 can someone please check it?}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503559#comment-16503559
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
Hi @fhueske , 

Avro array type actually is mapped to Java List type, specifically the 
class **org.apache.avro.generic.GenericData.Array** extends 
**java.util.AbstractList**. I tried to convert the List  to an Array in 
**AvroRowDeserializationSchema**, but to make it generic an **Object []** must 
be returned, which would then lead to a cast problem when passing the **Object 
[]** to TypeSerializer.copy().

I tried using **ListTypeInfo** to declare corresponding Avro array type, it 
was just working fine, as we already have **ListSerializer** to support it.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread tragicjun
Github user tragicjun commented on the issue:

https://github.com/apache/flink/pull/6082
  
Hi @fhueske , 

Avro array type actually is mapped to Java List type, specifically the 
class **org.apache.avro.generic.GenericData.Array** extends 
**java.util.AbstractList**. I tried to convert the List  to an Array in 
**AvroRowDeserializationSchema**, but to make it generic an **Object []** must 
be returned, which would then lead to a cast problem when passing the **Object 
[]** to TypeSerializer.copy().

I tried using **ListTypeInfo** to declare corresponding Avro array type, it 
was just working fine, as we already have **ListSerializer** to support it.


---


[jira] [Commented] (FLINK-9503) Migrate integration tests for iterative aggregators

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503542#comment-16503542
 ] 

ASF GitHub Bot commented on FLINK-9503:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6129

[FLINK-9503] Migrate integration tests for iterative aggregators

## What is the purpose of the change

*This pull request migrate integration tests for iterative aggregators*


## Brief change log

  - *Migrate integration tests for iterative aggregators use `collect` api 
to replace temp file*


## Verifying this change

This change is already covered by existing tests, such as 
*AggregatorsITCase*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9503

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6129.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6129


commit f3d0d8f46ccea32c827b11d35a41cbbdad8ff4f3
Author: yanghua 
Date:   2018-06-06T16:23:25Z

[FLINK-9503] Migrate integration tests for iterative aggregators




> Migrate integration tests for iterative aggregators
> ---
>
> Key: FLINK-9503
> URL: https://issues.apache.org/jira/browse/FLINK-9503
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Deepak Sharma
>Assignee: vinoyang
>Priority: Minor
>
> Migrate integration tests in org.apache.flink.test.iterative.aggregators to 
> use collect() instead of temp files. Related to parent jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6129: [FLINK-9503] Migrate integration tests for iterati...

2018-06-06 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6129

[FLINK-9503] Migrate integration tests for iterative aggregators

## What is the purpose of the change

*This pull request migrate integration tests for iterative aggregators*


## Brief change log

  - *Migrate integration tests for iterative aggregators use `collect` api 
to replace temp file*


## Verifying this change

This change is already covered by existing tests, such as 
*AggregatorsITCase*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9503

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6129.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6129


commit f3d0d8f46ccea32c827b11d35a41cbbdad8ff4f3
Author: yanghua 
Date:   2018-06-06T16:23:25Z

[FLINK-9503] Migrate integration tests for iterative aggregators




---


[jira] [Commented] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-06-06 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503536#comment-16503536
 ] 

Elias Levy commented on FLINK-8886:
---

I am ok with having a separate issue for per job JVM isolation, but I am 
primarily interested on the per job TM isolation via scheduling to matching 
TMs.  In practice that will give us JVM isolation without having to wait for 
anything else.

> Job isolation via scheduling in shared cluster
> --
>
> Key: FLINK-8886
> URL: https://issues.apache.org/jira/browse/FLINK-8886
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Local Runtime, Scheduler
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: Renjie Liu
>Priority: Major
>
> Flink's TaskManager executes tasks from different jobs within the same JVM as 
> threads.  We prefer to isolate different jobs on their own JVM.  Thus, we 
> must use different TMs for different jobs.  As currently the scheduler will 
> allocate task slots within a TM to tasks from different jobs, that means we 
> must stand up one cluster per job.  This is wasteful, as it requires at least 
> two JobManagers per cluster for high-availability, and the JMs have low 
> utilization.
> Additionally, different jobs may require different resources.  Some jobs are 
> compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
> the scheduler threats all TMs are equivalent, except possibly in their number 
> of available task slots.  Thus, one is required to stand up multiple cluster 
> if there is a need for different types of TMs.
> It would be useful if one could specify requirements on job, such that they 
> are only scheduled on a subset of TMs.  Properly configured, that would 
> permit isolation of jobs in a shared cluster and scheduling of jobs with 
> specific resource needs.
> One possible implementation is to specify a set of tags on the TM config file 
> which the TMs used when registering with the JM, and another set of tags 
> configured within the job or supplied when submitting the job.  The scheduler 
> could then match the tags in the job with the tags in the TMs.  In a 
> restrictive mode the scheduler would assign a job task to a TM only if all 
> tags match.  In a relaxed mode the scheduler could assign a job task to a TM 
> if there is a partial match, while giving preference to a more accurate match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9543) Expose JobMaster IDs to metric system

2018-06-06 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9543:
---

Assignee: vinoyang

> Expose JobMaster IDs to metric system
> -
>
> Key: FLINK-9543
> URL: https://issues.apache.org/jira/browse/FLINK-9543
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime, Metrics
>Reporter: Chesnay Schepler
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> To be able to differentiate between metrics from different taskmanagers we 
> should expose the Jobmanager ID (i.e. the resourceID) to the metric system, 
> like we do for TaskManagers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503505#comment-16503505
 ] 

Sihua Zhou commented on FLINK-9506:
---

[~yow] Maybe there is one more optimization that could have a try, I see you 
are using the ReduceState in your code just to accumulate the 
`record.getInt("I_PRIMARY_UNITS")` and collect the result in `onTimer()`. For 
the ReduceState it works as follows:

- get the "old result" from RocksDB.
- reduce the "old result" with the input, and put the "new result" back to 
RocksDB.

that means for input record in processElement(), it needs to do a `get` and a 
`put` to RocksDB. And the `get` cost much more then `put`. I would suggest to 
use the ListState instead. With using ListState, what you need to do are:

- Performing {{ListState.add(record)}} in {{processElement()}}, since the 
`ListState.add()` is cheap as it not put the record into Rocks.
- Performing reducing in {{OnTimer()}}, the reducing might look as follow:
{code:java}
List< JSONObject> records = listState.get();
for (JSonObject jsonObj : records) {
// do accumulation
}
out.collect(result);
{code}

In this way, for every key very seconds, you only need to do one read operation 
of RocksDB.




> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9544) Downgrade kinesis protocol from CBOR to JSON not possible as required by kinesalite

2018-06-06 Thread Ph.Duveau (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ph.Duveau updated FLINK-9544:
-
Summary: Downgrade kinesis protocol from CBOR to JSON not possible as 
required by kinesalite  (was: Downgrade kinesis protocole from CBOR to JSON not 
possible as required by kinesalite)

> Downgrade kinesis protocol from CBOR to JSON not possible as required by 
> kinesalite
> ---
>
> Key: FLINK-9544
> URL: https://issues.apache.org/jira/browse/FLINK-9544
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2
>Reporter: Ph.Duveau
>Priority: Critical
>
> The amazon client do not downgrade from CBOR to JSON while setting env 
> AWS_CBOR_DISABLE to true (or 1) and/or defining 
> com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
> shade relocation of com.amazon.* classes. As soon as you cancel this 
> relocation (by removing the relocation in the kinesis connector or by 
> re-relocating in the final jar), it reruns again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9544) Downgrade kinesis protocole from CBOR to JSON not possible as required by kinesalite

2018-06-06 Thread Ph.Duveau (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ph.Duveau updated FLINK-9544:
-
Summary: Downgrade kinesis protocole from CBOR to JSON not possible as 
required by kinesalite  (was: Impossible to downgrade kinesis protocole from 
CBOR to JSON as required by kinesalite)

> Downgrade kinesis protocole from CBOR to JSON not possible as required by 
> kinesalite
> 
>
> Key: FLINK-9544
> URL: https://issues.apache.org/jira/browse/FLINK-9544
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2
>Reporter: Ph.Duveau
>Priority: Critical
>
> The amazon client do not downgrade from CBOR to JSON while setting env 
> AWS_CBOR_DISABLE to true (or 1) and/or defining 
> com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
> shade relocation of com.amazon.* classes. As soon as you cancel this 
> relocation (by removing the relocation in the kinesis connector or by 
> re-relocating in the final jar), it reruns again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503426#comment-16503426
 ] 

swy commented on FLINK-9506:


[~kkrugler] please don't close the ticket yet. Because the performance 
degradation still happen, it is just a bit better in fluctuation after Rocksdb 
is deployed. We need to get Rocksdb setup correctly, hopefully performance drop 
will not happen in busty pattern if statebackend is working properly. Would 
appreciated to get advice here.

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503401#comment-16503401
 ] 

Ken Krugler commented on FLINK-9506:


@swy - the questions about setting up RocksDB, and your configuration, are best 
asked on the mailing list versus as a comment on this issue. Also, it looks 
like this issue can be closed as not a problem now, do you agree? Thanks!

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9535) log4j.properties specified in env.java.options doesn't get picked.

2018-06-06 Thread Ken Krugler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503396#comment-16503396
 ] 

Ken Krugler commented on FLINK-9535:


I believe that {{log4j.configuration}} takes a path to a file, but your 
{{log4j.properties}} is a resource inside of your jar, yes?

Also, it's best to ask questions like this on the user mailing list first, 
before opening a bug in Jira, thanks.

> log4j.properties specified in env.java.options doesn't get picked. 
> ---
>
> Key: FLINK-9535
> URL: https://issues.apache.org/jira/browse/FLINK-9535
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.2
>Reporter: SUBRAMANYA SURESH
>Assignee: vinoyang
>Priority: Major
>
> I created a log4j.properties and packaged it in source/main/resources of my 
> Job jar. As per the documentation I added 
> env.java.opts="-Dlog4j.configuration=log4j.properties" to my flink-conf.yaml. 
> When I submit my job to the Flink yarn cluster, it does not pick up this 
> log4j.properties. 
> Observations:
> The JVM options in the JobManager logs seem to have both the below, with the 
> latter overriding what I specified ? I tried altering the flink-daemon.sh 
> from adding the log settings, but it still shows up. 
> -Dlog4j.configuration=log4j.properties
> -Dlog4j.configuration=file:log4j.properties



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9544) Impossible to downgrade kinesis protocole from CBOR to JSON as required by kinesalite

2018-06-06 Thread Ph.Duveau (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ph.Duveau updated FLINK-9544:
-
Description: The amazon client do not downgrade from CBOR to JSON while 
setting env AWS_CBOR_DISABLE to true (or 1) and/or defining 
com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
shade relocation of com.amazon.* classes. As soon as you cancel this relocation 
(by removing the relocation in the kinesis connector or by re-relocating in the 
final jar), it reruns again.  (was: The amazon client do not downgrade from 
CBOR to JSON while setting env AWS_CBOR_DISABLE to true (or 1) and/or defining 
com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
shade relocation of com.amazon.* classes. As soon as you cancel (by removing 
the relocation in the kinesis connector or by re-relocating in the final jar), 
it reruns again.)

> Impossible to downgrade kinesis protocole from CBOR to JSON as required by 
> kinesalite
> -
>
> Key: FLINK-9544
> URL: https://issues.apache.org/jira/browse/FLINK-9544
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.5.0, 1.4.1, 1.4.2
>Reporter: Ph.Duveau
>Priority: Critical
>
> The amazon client do not downgrade from CBOR to JSON while setting env 
> AWS_CBOR_DISABLE to true (or 1) and/or defining 
> com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
> shade relocation of com.amazon.* classes. As soon as you cancel this 
> relocation (by removing the relocation in the kinesis connector or by 
> re-relocating in the final jar), it reruns again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9544) Impossible to downgrade kinesis protocole from CBOR to JSON as required by kinesalite

2018-06-06 Thread Ph.Duveau (JIRA)
Ph.Duveau created FLINK-9544:


 Summary: Impossible to downgrade kinesis protocole from CBOR to 
JSON as required by kinesalite
 Key: FLINK-9544
 URL: https://issues.apache.org/jira/browse/FLINK-9544
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.4.2, 1.4.1, 1.5.0, 1.4.0
Reporter: Ph.Duveau


The amazon client do not downgrade from CBOR to JSON while setting env 
AWS_CBOR_DISABLE to true (or 1) and/or defining 
com.amazonaws.sdk.disableCbor=true via JVM options. This bug is due to maven 
shade relocation of com.amazon.* classes. As soon as you cancel (by removing 
the relocation in the kinesis connector or by re-relocating in the final jar), 
it reruns again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9506) Flink ReducingState.add causing more than 100% performance drop

2018-06-06 Thread swy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503284#comment-16503284
 ] 

swy commented on FLINK-9506:


[~srichter] Thanks for tips, after implement Rocksdb the performance seems much 
more scale-able now, and a little bit less fluctuation. I have few questions 
related to rocksdb. 
1. Just to confirm, RocksDB is needed to setup in every TM machine? Any other 
option?
2. What is the recommendation for RocksDB's statebackend? We are using tmpfs 
with checkpoint now with savepoint persists to hdfs.
3. By source code, rocksdb options like parallelism and certain predefined 
option could be configured, any corresponding parameter in flink_config.yaml?
4. Below is the configuration we are using, could you please comment if 
something not right?
env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
RocksDBStateBackend rocksdb = new RocksDBStateBackend(new 
FsStateBackend("file:///tmp/rocksdb_simple_example/checkpoints"), true);
env.setStateBackend(rocksdb);
//rocksdb.setOptions(new RocksdbOptions());

rocksdb.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);


Or in flink_config.yaml:
state.backend: rocksdb
state.backend.fs.checkpointdir: file:///tmp/rocksdb_simple_example/checkpoints
state.backend.incremental: true
state.backend.async: true
state.checkpoints.num-retained: 5
state.savepoints.dir:  file:///tmp/rocksdb_simple_example/savepoints


Thank you in advance! 

> Flink ReducingState.add causing more than 100% performance drop
> ---
>
> Key: FLINK-9506
> URL: https://issues.apache.org/jira/browse/FLINK-9506
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.2
>Reporter: swy
>Priority: Major
> Attachments: KeyNoHash_VS_KeyHash.png, flink.png
>
>
> Hi, we found out application performance drop more than 100% when 
> ReducingState.add is used in the source code. In the test checkpoint is 
> disable. And filesystem(hdfs) as statebackend.
> It could be easyly reproduce with a simple app, without checkpoint, just 
> simply keep storing record, also with simple reduction function(in fact with 
> empty function would see the same result). Any idea would be appreciated. 
> What an unbelievable obvious issue.
> Basically the app just keep storing record into the state, and we measure how 
> many record per second in "JsonTranslator", which is shown in the graph. The 
> difference between is just 1 line, comment/un-comment "recStore.add(r)".
> {code}
> DataStream stream = env.addSource(new GeneratorSource(loop);
> DataStream convert = stream.map(new JsonTranslator())
>.keyBy()
>.process(new ProcessAggregation())
>.map(new PassthruFunction());  
> public class ProcessAggregation extends ProcessFunction {
> private ReducingState recStore;
> public void processElement(Recordr, Context ctx, Collector out) {
> recStore.add(r); //this line make the difference
> }
> {code}
> Record is POJO class contain 50 String private member.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9543) Expose JobMaster IDs to metric system

2018-06-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9543:
---

 Summary: Expose JobMaster IDs to metric system
 Key: FLINK-9543
 URL: https://issues.apache.org/jira/browse/FLINK-9543
 Project: Flink
  Issue Type: New Feature
  Components: Local Runtime, Metrics
Reporter: Chesnay Schepler
 Fix For: 1.6.0


To be able to differentiate between metrics from different taskmanagers we 
should expose the Jobmanager ID (i.e. the resourceID) to the metric system, 
like we do for TaskManagers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193392673
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
--- End diff --

can be omitted as it is already defined in the root `pom.xml`


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503232#comment-16503232
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193394392
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch5_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
--- End diff --

This section should be unnecessary.


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503233#comment-16503233
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193395032
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -51,12 +59,12 @@ sed -i -e ''"$(($position + 1))"'i\
 ${flink.version}\
--- End diff --

It would be _awesome_ if you could replace this hard-coded dependency with 
the one defined in the example project. But I wouldn't block the PR on it.


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193392825
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
--- End diff --

set `scope` to `provided`


---


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503231#comment-16503231
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193392673
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
--- End diff --

can be omitted as it is already defined in the root `pom.xml`


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9451) End-to-end test: Scala Quickstarts

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503234#comment-16503234
 ] 

ASF GitHub Bot commented on FLINK-9451:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193392825
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
--- End diff --

set `scope` to `provided`


> End-to-end test: Scala Quickstarts
> --
>
> Key: FLINK-9451
> URL: https://issues.apache.org/jira/browse/FLINK-9451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Quickstarts
>Affects Versions: 1.5.0, 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Blocker
>
> We could add an end-to-end test which verifies Flink's quickstarts scala. It 
> should do the following:
>  # create a new Flink project using the quickstarts archetype
>  # add a new Flink dependency to the {{pom.xml}} (e.g. Flink connector or 
> library)
>  # run {{mvn clean package -Pbuild-jar}}
>  # verify that no core dependencies are contained in the jar file
>  # Run the program



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193395032
  
--- Diff: flink-end-to-end-tests/test-scripts/test_quickstarts.sh ---
@@ -51,12 +59,12 @@ sed -i -e ''"$(($position + 1))"'i\
 ${flink.version}\
--- End diff --

It would be _awesome_ if you could replace this hard-coded dependency with 
the one defined in the example project. But I wouldn't block the PR on it.


---


[GitHub] flink pull request #6089: [FLINK-9451]End-to-end test: Scala Quickstarts

2018-06-06 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6089#discussion_r193394392
  
--- Diff: flink-end-to-end-tests/flink-quickstart-test/pom.xml ---
@@ -0,0 +1,98 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   org.apache.flink
+   flink-end-to-end-tests
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-quickstart-test
+   flink-quickstart-test
+   jar
+
+   
+   2.11
+   
+
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-connector-elasticsearch5_${scala.binary.version}
+   ${project.version}
+   
+   
+
+   
--- End diff --

This section should be unnecessary.


---


[jira] [Commented] (FLINK-8430) Implement stream-stream non-window full outer join

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503192#comment-16503192
 ] 

ASF GitHub Bot commented on FLINK-8430:
---

Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6079
  
@twalthr Thanks a lot for the review and merging. I will take a look at the 
FLINK-9440 and see if we can do any improvement.


> Implement stream-stream non-window full outer join
> --
>
> Key: FLINK-8430
> URL: https://issues.apache.org/jira/browse/FLINK-8430
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6079: [FLINK-8430] [table] Implement stream-stream non-window f...

2018-06-06 Thread hequn8128
Github user hequn8128 commented on the issue:

https://github.com/apache/flink/pull/6079
  
@twalthr Thanks a lot for the review and merging. I will take a look at the 
FLINK-9440 and see if we can do any improvement.


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503160#comment-16503160
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193377245
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
+   (rk != null) ? rk : "",
+   (returnListener != null) && 
messageCompute.computeMandatory(value),
--- End diff --

For the two behavior are OK, if you think that throwing an exception is a 
better pattern I will change it. And in fact, throwing exception is easier to 
explain in documentation.   


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193377245
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -105,7 +153,18 @@ public void invoke(IN value) {
try {
byte[] msg = schema.serialize(value);
 
-   channel.basicPublish("", queueName, null, msg);
+   if (messageCompute == null) {
+   channel.basicPublish("", queueName, null, msg);
+   } else {
+   String rk = 
messageCompute.computeRoutingKey(value);
+   String exchange = 
messageCompute.computeExchange(value);
+   channel.basicPublish((exchange != null) ? 
exchange : "",
+   (rk != null) ? rk : "",
+   (returnListener != null) && 
messageCompute.computeMandatory(value),
--- End diff --

For the two behavior are OK, if you think that throwing an exception is a 
better pattern I will change it. And in fact, throwing exception is easier to 
explain in documentation.   


---


[jira] [Commented] (FLINK-9276) Improve error message when TaskManager fails

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503153#comment-16503153
 ] 

ASF GitHub Bot commented on FLINK-9276:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
@aljoscha @kl0u 


> Improve error message when TaskManager fails
> 
>
> Key: FLINK-9276
> URL: https://issues.apache.org/jira/browse/FLINK-9276
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Critical
>
> When a TaskManager fails, we frequently get a message
> {code}
> org.apache.flink.util.FlinkException: Releasing TaskManager 
> container_1524853016208_0001_01_000102
> {code}
> This message is misleading in that it sounds like an intended operation, when 
> it really is a failure of a container that the {{ResourceManager}} reports to 
> the {{JobManager}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-06 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503155#comment-16503155
 ] 

Fabian Hueske commented on FLINK-9528:
--

Hi [~hequn8128], can you explain in more detail what you mean by:

{quote}However it may bring some extra side effects, for instance, user can not 
filter the data what they don't need(actually), although the redundant messages 
won't affect the correctness.{quote}

I'm not sure about adding state or a special communication mode. From my point 
of view, we should not add additional storage overhead (state) or special code 
paths if the result is correct already. It is true that we would send more 
records than necessary, but they can be swallowed by the next operator. A sink 
that does not want to emit duplicate delete messages could also add state to 
remove duplicates (or a simple in-memory cache to reduce them).

As an optimization, we can also think about adding a GroupByHaving operator 
that does the filtering internally. However, we definitely need a general 
solution for Filters with Upsert inputs.


> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-06-06 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5954
  
@aljoscha @kl0u 


---


[jira] [Assigned] (FLINK-9538) Make KeyedStateFunction an interface

2018-06-06 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9538:
---

Assignee: vinoyang

> Make KeyedStateFunction an interface
> 
>
> Key: FLINK-9538
> URL: https://issues.apache.org/jira/browse/FLINK-9538
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dawid Wysakowicz
>Assignee: vinoyang
>Priority: Major
>
> I suggest to change the KeyedStateFunction from abstract class to interface 
> (FunctionalInterface in particular) to enable passing lambdas.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9542) ExEnv#registerCachedFile should accept Path

2018-06-06 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-9542:
---

 Summary: ExEnv#registerCachedFile should accept Path
 Key: FLINK-9542
 URL: https://issues.apache.org/jira/browse/FLINK-9542
 Project: Flink
  Issue Type: Improvement
  Components: DataSet API, DataStream API
Reporter: Chesnay Schepler


Currently, {{registerCachedFile}} accepts {{Strings}} as file-paths.

This is undesirable because invalid paths are detected much later than 
necessary; at the moment this happens when we attempt to upload them to the 
blob store, i.e. during job submission, when ideally it should fail right away.

As an intermediate solution we can modify the {{DistributedCacheEntries}} to 
contain {{Paths}} instead of {{Strings}}. This will not require API changes but 
still allow earlier detection.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9541) Add robots.txt and sitemap.xml to Flink website

2018-06-06 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-9541:


 Summary: Add robots.txt and sitemap.xml to Flink website
 Key: FLINK-9541
 URL: https://issues.apache.org/jira/browse/FLINK-9541
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Fabian Hueske


>From the [dev mailing 
>list|https://lists.apache.org/thread.html/71ce1bfbed1cf5f0069b27a46df1cd4dccbe8abefa75ac85601b088b@%3Cdev.flink.apache.org%3E]:

{quote}
It would help to add a sitemap (and the robots.txt required to reference it) 
for flink.apache.org and ci.apache.org (for /projects/flink)

You can see what Tomcat did along these lines - 
http://tomcat.apache.org/robots.txt references 
http://tomcat.apache.org/sitemap.xml, which is a sitemap index file pointing to 
http://tomcat.apache.org/sitemap-main.xml

By doing this, you can emphasize more recent versions of docs. There are other 
benefits, but reducing poor Google search results (to me) is the biggest win.

E.g.  https://www.google.com/search?q=flink+reducingstate 
 (search on flink reducing 
state) shows the 1.3 Javadocs (hit #1), master (1.6-SNAPSHOT) Javadocs (hit 
#2), and then many pages of other results.

Whereas the Javadocs for 1.5 

 and 1.4 

 are nowhere to be found.
{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9444) KafkaAvroTableSource failed to work for map and array fields

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9444?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503120#comment-16503120
 ] 

ASF GitHub Bot commented on FLINK-9444:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
We treat sequences of values as arrays in SQL and the Table API. There are 
no built-in functions to handle lists. So we should return the values as an 
array, and hence don't need a List type.


> KafkaAvroTableSource failed to work for map and array fields
> 
>
> Key: FLINK-9444
> URL: https://issues.apache.org/jira/browse/FLINK-9444
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Jun Zhang
>Priority: Blocker
>  Labels: patch
> Fix For: 1.6.0
>
> Attachments: flink-9444.patch
>
>
> When some Avro schema has map/array fields and the corresponding TableSchema 
> declares *MapTypeInfo/ListTypeInfo* for these fields, an exception will be 
> thrown when registering the *KafkaAvroTableSource*, complaining like:
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type Map of table field 'event' does not match with type 
> GenericType of the field 'event' of the TableSource return 
> type.
>  at org.apache.flink.table.api.ValidationException$.apply(exceptions.scala:74)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:92)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$validateTableSource$1.apply(TableSourceUtil.scala:71)
>  at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>  at 
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:71)
>  at 
> org.apache.flink.table.plan.schema.StreamTableSourceTable.(StreamTableSourceTable.scala:33)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:124)
>  at 
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:438)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
We treat sequences of values as arrays in SQL and the Table API. There are 
no built-in functions to handle lists. So we should return the values as an 
array, and hence don't need a List type.


---


[jira] [Commented] (FLINK-9525) Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503109#comment-16503109
 ] 

ASF GitHub Bot commented on FLINK-9525:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6118
  
Where in the loading of the factories do you see the error?
My suspicion is still an issue with inverted class loading.

To confirm, can we check the following?
  - Are you running this on Flink 1.4.0 or 1.4.1?
  - Do you have `hadoop-common` in the job's jar, or in the `flink/lib` 
folder?
  - Does the error go away if you set "classloader.resolve-order: 
parent-first" in the config?


> Missing META-INF/services/*FileSystemFactory in flink-hadoop-fs module
> --
>
> Key: FLINK-9525
> URL: https://issues.apache.org/jira/browse/FLINK-9525
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0, 1.5.0, 1.6.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
> Attachments: wx20180605-142...@2x.png
>
>
> if flink job dependencies includes `hadoop-common` and `hadoop-hdfs`, will 
> throw runtime error.
> like this case: 
> [https://stackoverflow.com/questions/47890596/java-util-serviceconfigurationerror-org-apache-hadoop-fs-filesystem-provider-o].
> the root cause: 
> see  {{org.apache.flink.core.fs.FileSystem}}
> This class will load all available file system factories via 
> {{ServiceLoader.load(FileSystemFactory.class)}}. 
> Since  {{ META-INF / services / org.apache.flink.core.fs.FileSystemFactory }} 
> file in the classpath does not have an 
> `org.apache.flink.runtime.fs.hdfs.HadoopFsFactory`, 
> and finaly only loaded one available {{LocalFileSystemFactory}} .
> more error messages see this screenshot.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6118: [FLINK-9525][filesystem] Add missing `META-INF/services/*...

2018-06-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6118
  
Where in the loading of the factories do you see the error?
My suspicion is still an issue with inverted class loading.

To confirm, can we check the following?
  - Are you running this on Flink 1.4.0 or 1.4.1?
  - Do you have `hadoop-common` in the job's jar, or in the `flink/lib` 
folder?
  - Does the error go away if you set "classloader.resolve-order: 
parent-first" in the config?


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193361587
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
--- End diff --

I am using Featured but perhaps Enhanced should be a better choice. What do 
you think ?


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503092#comment-16503092
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193361587
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
--- End diff --

I am using Featured but perhaps Enhanced should be a better choice. What do 
you think ?


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503090#comment-16503090
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193360923
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeaturedReturnHandler() throws 
Exception {
--- End diff --

You can see that if you do  not provide a returnedMessage Handler. You must 
not set immediate and/or mandatory to true when you publish even if you set 
them to true in class DummyPublishOptions they are overwritten in basicPublish 
call as you can see in the two test invokeFeaturedPublishBytesToQueue 
(overwritten) and  invokeFeaturedReturnHandlerPublishBytesToQueue (not 
overwritten).


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193360923
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeaturedReturnHandler() throws 
Exception {
--- End diff --

You can see that if you do  not provide a returnedMessage Handler. You must 
not set immediate and/or mandatory to true when you publish even if you set 
them to true in class DummyPublishOptions they are overwritten in basicPublish 
call as you can see in the two test invokeFeaturedPublishBytesToQueue 
(overwritten) and  invokeFeaturedReturnHandlerPublishBytesToQueue (not 
overwritten).


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503081#comment-16503081
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193359253
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
--- End diff --

Done


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503082#comment-16503082
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193359300
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
--- End diff --

Done


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193359300
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
--- End diff --

Done


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193359253
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -83,7 +103,22 @@ public void throwExceptionIfChannelIsNull() throws 
Exception {
}
 
private RMQSink createRMQSink() throws Exception {
-   RMQSink rmqSink = new RMQSink(rmqConnectionConfig, 
QUEUE_NAME, serializationSchema);
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, QUEUE_NAME, serializationSchema);
+   rmqSink.open(new Configuration());
+   return rmqSink;
+   }
+
+   private RMQSink createRMQSinkFeatured() throws Exception {
+   publishOptions = new DummyPublishOptions();
+   RMQSink rmqSink = new 
RMQSink(rmqConnectionConfig, serializationSchema, publishOptions);
--- End diff --

Done


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503077#comment-16503077
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193358805
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -66,12 +79,19 @@ public void before() throws Exception {
}
 
@Test
-   public void openCallDeclaresQueue() throws Exception {
+   public void openCallDeclaresQueueInStandardMode() throws Exception {
createRMQSink();
 
verify(channel).queueDeclare(QUEUE_NAME, false, false, false, 
null);
}
 
+   @Test
+   public void openCallDontDeclaresQueueInFeaturedMode() throws Exception {
+   doThrow(Exception.class).when(channel).queueDeclare(null, 
false, false, false, null);
--- End diff --

Done


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193358805
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -66,12 +79,19 @@ public void before() throws Exception {
}
 
@Test
-   public void openCallDeclaresQueue() throws Exception {
+   public void openCallDeclaresQueueInStandardMode() throws Exception {
createRMQSink();
 
verify(channel).queueDeclare(QUEUE_NAME, false, false, false, 
null);
}
 
+   @Test
+   public void openCallDontDeclaresQueueInFeaturedMode() throws Exception {
+   doThrow(Exception.class).when(channel).queueDeclare(null, 
false, false, false, null);
--- End diff --

Done


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193357238
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
--- End diff --

In fact, now we have a method to change the value.


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503071#comment-16503071
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193357238
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
private final RMQConnectionConfig rmqConnectionConfig;
protected transient Connection connection;
protected transient Channel channel;
protected SerializationSchema schema;
-   private boolean logFailuresOnly = false;
+   protected boolean logFailuresOnly = false;
--- End diff --

In fact, now we have a method to change the value.


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503067#comment-16503067
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356675
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
verify(connection).close();
}
 
+   @Test
+   public void invokeFeaturedPublishBytesToQueue() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, 
false,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test
+   public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeaturedReturnHandler();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void exceptionDuringFeaturedPublishingIsNotIgnored() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   @Test
+   public void 
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+   rmqSink.setLogFailuresOnly(true);
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   private class DummyPublishOptions implements 
RMQSinkPublishOptions {
--- End diff --

Done


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503069#comment-16503069
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356713
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
--- End diff --

OK


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356675
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java
 ---
@@ -124,7 +159,82 @@ public void closeAllResources() throws Exception {
verify(connection).close();
}
 
+   @Test
+   public void invokeFeaturedPublishBytesToQueue() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, false, 
false,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test
+   public void invokeFeaturedReturnHandlerPublishBytesToQueue() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeaturedReturnHandler();
+
+   rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
+   verify(serializationSchema).serialize(MESSAGE_STR);
+   verify(channel).basicPublish(EXCHANGE, ROUTING_KEY, true, true,
+   publishOptions.computeProperties(""), MESSAGE);
+   }
+
+   @Test(expected = RuntimeException.class)
+   public void exceptionDuringFeaturedPublishingIsNotIgnored() throws 
Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   @Test
+   public void 
exceptionDuringFeaturedPublishingIsIgnoredIfLogFailuresOnly() throws Exception {
+   RMQSink rmqSink = createRMQSinkFeatured();
+   rmqSink.setLogFailuresOnly(true);
+
+   doThrow(IOException.class).when(channel).basicPublish(EXCHANGE, 
ROUTING_KEY, false, false,
+   publishOptions.computeProperties(""), MESSAGE);
+   rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
+   }
+
+   private class DummyPublishOptions implements 
RMQSinkPublishOptions {
--- End diff --

Done


---


[jira] [Commented] (FLINK-8468) Make the connector to take advantage of AMQP features (routing key, exchange and message properties)

2018-06-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503066#comment-16503066
 ] 

ASF GitHub Bot commented on FLINK-8468:
---

Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356620
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -109,6 +109,7 @@ private RMQConnectionConfig(String host, Integer port, 
String virtualHost, Strin
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
+   * @param createQueue enable or diable queue create on setup
--- End diff --

Sorry for this.


> Make the connector to take advantage of AMQP features (routing key, exchange 
> and message properties)
> 
>
> Key: FLINK-8468
> URL: https://issues.apache.org/jira/browse/FLINK-8468
> Project: Flink
>  Issue Type: Improvement
>  Components: RabbitMQ Connector
>Affects Versions: 1.4.0
>Reporter: Ph.Duveau
>Priority: Major
>
> Make the connector to take advantage of AMQP features by adding a constructor 
> and an interface to implement



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356713
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -40,22 +42,63 @@
 
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
-   protected final String queueName;
+   private final String queueName;
--- End diff --

OK


---


[GitHub] flink pull request #5410: [FLINK-8468] [RabbitMQ Connector] Take advantage o...

2018-06-06 Thread pduveau
Github user pduveau commented on a diff in the pull request:

https://github.com/apache/flink/pull/5410#discussion_r193356620
  
--- Diff: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -109,6 +109,7 @@ private RMQConnectionConfig(String host, Integer port, 
String virtualHost, Strin
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
+   * @param createQueue enable or diable queue create on setup
--- End diff --

Sorry for this.


---


[jira] [Updated] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Razvan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan updated FLINK-9540:
--
Description: 
Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
get:

 

#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements. See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership. The ASF licenses this file
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License. You may obtain a copy of the License at
 #
 # [http://www.apache.org/licenses/LICENSE-2.0]
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #

*version=2.8.1*
 revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
 *branch=branch-2.8.1-private*
 user=vinodkv
 date=2017-06-07T21:22Z
 *url=[https://git-wip-us.apache.org/repos/asf/hadoop.git*]
 srcChecksum=60125541c2b3e266cbf3becc5bda666
 protocVersion=2.5.0

 

This will fail to work with dependencies described in 
([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]

 

"Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
{{hadoop-2.7/share/hadoop/tools/lib}}:
 * {{S3AFileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
 *** {{aws-java-sdk-core-1.11.183.jar}}
 *** {{aws-java-sdk-kms-1.11.183.jar}}
 *** {{jackson-annotations-2.6.7.jar}}
 *** {{jackson-core-2.6.7.jar}}
 *** {{jackson-databind-2.6.7.jar}}
 *** {{joda-time-2.8.1.jar}}
 *** {{httpcore-4.4.4.jar}}
 *** {{httpclient-4.5.3.jar}}
 * {{NativeS3FileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{guava-11.0.2.jar"}}{{}}

 

{{  I presume the build task is flawed, it should be built for Apache Hadoop 
2.7 can someone please check it?}}

  was:
Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
get:

 

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

*version=2.8.1*
revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
*branch=branch-2.8.1-private*
user=vinodkv
date=2017-06-07T21:22Z
*url=https://git-wip-us.apache.org/repos/asf/hadoop.git*
srcChecksum=60125541c2b3e266cbf3becc5bda666
protocVersion=2.5.0

 

This will fail to work with dependencies described in 
([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]

 

"Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
{{hadoop-2.7/share/hadoop/tools/lib}}:
 * {{S3AFileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
 *** {{aws-java-sdk-core-1.11.183.jar}}
 *** {{aws-java-sdk-kms-1.11.183.jar}}
 *** {{jackson-annotations-2.6.7.jar}}
 *** {{jackson-core-2.6.7.jar}}
 *** {{jackson-databind-2.6.7.jar}}
 *** {{joda-time-2.8.1.jar}}
 *** {{httpcore-4.4.4.jar}}
 *** {{httpclient-4.5.3.jar}}
 * {{NativeS3FileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{guava-11.0.2.jar"}}{{}}

 

{{  I presume the build task is flawed, it should be built for Apache Hadoop 
2.7}}


> Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> -
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  

[jira] [Assigned] (FLINK-8886) Job isolation via scheduling in shared cluster

2018-06-06 Thread Renjie Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu reassigned FLINK-8886:
-

Assignee: Renjie Liu

> Job isolation via scheduling in shared cluster
> --
>
> Key: FLINK-8886
> URL: https://issues.apache.org/jira/browse/FLINK-8886
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination, Local Runtime, Scheduler
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: Renjie Liu
>Priority: Major
>
> Flink's TaskManager executes tasks from different jobs within the same JVM as 
> threads.  We prefer to isolate different jobs on their own JVM.  Thus, we 
> must use different TMs for different jobs.  As currently the scheduler will 
> allocate task slots within a TM to tasks from different jobs, that means we 
> must stand up one cluster per job.  This is wasteful, as it requires at least 
> two JobManagers per cluster for high-availability, and the JMs have low 
> utilization.
> Additionally, different jobs may require different resources.  Some jobs are 
> compute heavy.  Some are IO heavy (lots of state in RocksDB).  At the moment 
> the scheduler threats all TMs are equivalent, except possibly in their number 
> of available task slots.  Thus, one is required to stand up multiple cluster 
> if there is a need for different types of TMs.
> It would be useful if one could specify requirements on job, such that they 
> are only scheduled on a subset of TMs.  Properly configured, that would 
> permit isolation of jobs in a shared cluster and scheduling of jobs with 
> specific resource needs.
> One possible implementation is to specify a set of tags on the TM config file 
> which the TMs used when registering with the JM, and another set of tags 
> configured within the job or supplied when submitting the job.  The scheduler 
> could then match the tags in the job with the tags in the TMs.  In a 
> restrictive mode the scheduler would assign a job task to a TM only if all 
> tags match.  In a relaxed mode the scheduler could assign a job task to a TM 
> if there is a partial match, while giving preference to a more accurate match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9540) Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Razvan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan updated FLINK-9540:
--
Summary: Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for 
Hadoop 2.8 and fails  (was: Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 
is built for Hadoop 2.8 and fails)

> Apache Flink 1.4.2 S3 Hadoop library for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> -
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
> #
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> *version=2.8.1*
> revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
> *branch=branch-2.8.1-private*
> user=vinodkv
> date=2017-06-07T21:22Z
> *url=https://git-wip-us.apache.org/repos/asf/hadoop.git*
> srcChecksum=60125541c2b3e266cbf3becc5bda666
> protocVersion=2.5.0
>  
> This will fail to work with dependencies described in 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]
>  
> "Depending on which file system you use, please add the following 
> dependencies. You can find these as part of the Hadoop binaries in 
> {{hadoop-2.7/share/hadoop/tools/lib}}:
>  * {{S3AFileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
>  *** {{aws-java-sdk-core-1.11.183.jar}}
>  *** {{aws-java-sdk-kms-1.11.183.jar}}
>  *** {{jackson-annotations-2.6.7.jar}}
>  *** {{jackson-core-2.6.7.jar}}
>  *** {{jackson-databind-2.6.7.jar}}
>  *** {{joda-time-2.8.1.jar}}
>  *** {{httpcore-4.4.4.jar}}
>  *** {{httpclient-4.5.3.jar}}
>  * {{NativeS3FileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{guava-11.0.2.jar"}}{{}}
>  
> {{  I presume the build task is flawed, it should be built for Apache Hadoop 
> 2.7}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9540) Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Razvan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan updated FLINK-9540:
--
Description: 
Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
get:

 

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

*version=2.8.1*
revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
*branch=branch-2.8.1-private*
user=vinodkv
date=2017-06-07T21:22Z
*url=https://git-wip-us.apache.org/repos/asf/hadoop.git*
srcChecksum=60125541c2b3e266cbf3becc5bda666
protocVersion=2.5.0

 

This will fail to work with dependencies described in 
([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]

 

"Depending on which file system you use, please add the following dependencies. 
You can find these as part of the Hadoop binaries in 
{{hadoop-2.7/share/hadoop/tools/lib}}:
 * {{S3AFileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
 *** {{aws-java-sdk-core-1.11.183.jar}}
 *** {{aws-java-sdk-kms-1.11.183.jar}}
 *** {{jackson-annotations-2.6.7.jar}}
 *** {{jackson-core-2.6.7.jar}}
 *** {{jackson-databind-2.6.7.jar}}
 *** {{joda-time-2.8.1.jar}}
 *** {{httpcore-4.4.4.jar}}
 *** {{httpclient-4.5.3.jar}}
 * {{NativeS3FileSystem}}:
 ** {{hadoop-aws-2.7.3.jar}}
 ** {{guava-11.0.2.jar"}}{{}}

 

{{  I presume the build task is flawed, it should be built for Apache Hadoop 
2.7}}

  was:
Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
get:

 

 


> Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> 
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
> #
> # Licensed to the Apache Software Foundation (ASF) under one
> # or more contributor license agreements. See the NOTICE file
> # distributed with this work for additional information
> # regarding copyright ownership. The ASF licenses this file
> # to you under the Apache License, Version 2.0 (the
> # "License"); you may not use this file except in compliance
> # with the License. You may obtain a copy of the License at
> #
> # http://www.apache.org/licenses/LICENSE-2.0
> #
> # Unless required by applicable law or agreed to in writing, software
> # distributed under the License is distributed on an "AS IS" BASIS,
> # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> # See the License for the specific language governing permissions and
> # limitations under the License.
> #
> *version=2.8.1*
> revision=1e6296df38f9cd3d9581c8af58a2a03a6e4312be
> *branch=branch-2.8.1-private*
> user=vinodkv
> date=2017-06-07T21:22Z
> *url=https://git-wip-us.apache.org/repos/asf/hadoop.git*
> srcChecksum=60125541c2b3e266cbf3becc5bda666
> protocVersion=2.5.0
>  
> This will fail to work with dependencies described in 
> ([https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html)]
>  
> "Depending on which file system you use, please add the following 
> dependencies. You can find these as part of the Hadoop binaries in 
> {{hadoop-2.7/share/hadoop/tools/lib}}:
>  * {{S3AFileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{aws-java-sdk-s3-1.11.183.jar}} and its dependencies:
>  *** {{aws-java-sdk-core-1.11.183.jar}}
>  *** {{aws-java-sdk-kms-1.11.183.jar}}
>  *** {{jackson-annotations-2.6.7.jar}}
>  *** {{jackson-core-2.6.7.jar}}
>  *** {{jackson-databind-2.6.7.jar}}
>  *** {{joda-time-2.8.1.jar}}
>  *** {{httpcore-4.4.4.jar}}
>  *** {{httpclient-4.5.3.jar}}
>  * {{NativeS3FileSystem}}:
>  ** {{hadoop-aws-2.7.3.jar}}
>  ** {{guava-11.0.2.jar"}}{{}}
>  
> {{  I presume the build task is flawed, it should 

[jira] [Updated] (FLINK-9540) Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Razvan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan updated FLINK-9540:
--
Description: 
Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
get:

 

 

  was:++Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. 


> Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> 
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. Go to opt/ inside 
> flink-s3-fs-hadoop-1.4.2.jar, open common version info .properties inside you 
> get:
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9540) Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 and fails

2018-06-06 Thread Razvan (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Razvan updated FLINK-9540:
--
Description: ++Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. 

> Apache Flink 1.4.2 S3 Hadooplibrary for Hadoop 2.7 is built for Hadoop 2.8 
> and fails
> 
>
> Key: FLINK-9540
> URL: https://issues.apache.org/jira/browse/FLINK-9540
> Project: Flink
>  Issue Type: Bug
>Reporter: Razvan
>Priority: Blocker
>
> ++Download Apache Flink 1.4.2 for Hadoop 2.7 or earlier. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >