Verifying watermarks in integration test

2018-02-20 Thread Thomas Weise
Hi,

I have a streaming integration test with two operators. A source that emits
records and watermarks, and a sink that collects the records. The topology
runs in embedded mode and the results are collected in memory. Now, in
addition to the records, I also want to verify that watermarks have been
emitted. What's the recommended way of doing that?

Thanks


[jira] [Created] (FLINK-8720) Logging exception

2018-02-20 Thread dejan miljkovic (JIRA)
dejan miljkovic created FLINK-8720:
--

 Summary: Logging exception 
 Key: FLINK-8720
 URL: https://issues.apache.org/jira/browse/FLINK-8720
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.4.1
Reporter: dejan miljkovic


Trying to stream data to S3. Code works from InteliJ. When submitting code 
trough UI on my machine (single node cluster started by start-cluster.sh 
script) below stack trace is produced.

 

Below is the link to the simple test app that is streaming data to S3. 
[https://github.com/dmiljkovic/test-flink-bucketingsink-s3]

The behavior is bit different but same error is produced.  Job works only once. 
If job is submitted second time below stack trace is produced. If I restart the 
cluster job works but only for the first time.

 

 
org.apache.commons.logging.LogConfigurationException: 
java.lang.IllegalAccessError: org/apache/commons/logging/impl/LogFactoryImpl$3 
(Caused by java.lang.IllegalAccessError: 
org/apache/commons/logging/impl/LogFactoryImpl$3)
at 
org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:637)
at 
org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:336)
at 
org.apache.commons.logging.impl.LogFactoryImpl.getInstance(LogFactoryImpl.java:310)
at org.apache.commons.logging.LogFactory.getLog(LogFactory.java:685)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:76)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:102)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:88)
at 
org.apache.http.impl.conn.PoolingClientConnectionManager.(PoolingClientConnectionManager.java:96)
at 
com.amazonaws.http.ConnectionManagerFactory.createPoolingClientConnManager(ConnectionManagerFactory.java:26)
at 
com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:158)
at 
com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:119)
at 
com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:389)
at 
com.amazonaws.services.s3.AmazonS3Client.(AmazonS3Client.java:371)
at 
org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalAccessError: 
org/apache/commons/logging/impl/LogFactoryImpl$3
at 
org.apache.commons.logging.impl.LogFactoryImpl.getParentClassLoader(LogFactoryImpl.java:700)
at 
org.apache.commons.logging.impl.LogFactoryImpl.createLogFromClass(LogFactoryImpl.java:1187)
at 
org.apache.commons.logging.impl.LogFactoryImpl.discoverLogImplementation(LogFactoryImpl.java:914)
at 
org.apache.commons.logging.impl.LogFactoryImpl.newInstance(LogFactoryImpl.java:604)
... 26 more
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8719) add module description for flink-contrib to clarify its purpose

2018-02-20 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8719:
---

 Summary: add module description for flink-contrib to clarify its 
purpose
 Key: FLINK-8719
 URL: https://issues.apache.org/jira/browse/FLINK-8719
 Project: Flink
  Issue Type: Improvement
  Components: flink-contrib
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.5.0


{\{flink-contrib}} currently doesn't have any clarification or description of 
its purpose, which confuses lots of developers. Adding clarification and module 
description



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8718) Non-parallel DataStreamSource does not set max parallelism

2018-02-20 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8718:
---

 Summary: Non-parallel DataStreamSource does not set max parallelism
 Key: FLINK-8718
 URL: https://issues.apache.org/jira/browse/FLINK-8718
 Project: Flink
  Issue Type: Bug
  Components: Core
Affects Versions: 1.5.0
Reporter: Gary Yao
 Fix For: 1.5.0


{{org.apache.flink.streaming.api.datastream.DataStreamSource}} does not set 
{{maxParallelism}} to 1 if it is non-parallel.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] Release flink-shaded 3.0, release candidate #2

2018-02-20 Thread Aljoscha Krettek
+1

 - verified hashes and signatures
 - verified that we didn't add dependencies with incompatible licenses

> On 19. Feb 2018, at 14:40, Chesnay Schepler  wrote:
> 
> +1
> 
> * compiles without error
> * every jar is self-contained
> * all classes are properly relocated
> * jackson services are properly relocated
> * all added dependencies use the Apache License 2.0
> o com.fasterxml.jackson.module:jackson-module-jsonSchema
> + javax.validation:validation-api
> o com.fasterxml.jackson.dataformat:jackson-dataformat-yaml
> + org.yaml:snakeyaml
> * verified compatibility with Flink
> o force-shading can be replaced
> o REST docs are correctly generated
> o flink-sql-client tests ran successfully
> 
> On 19.02.2018 14:13, Chesnay Schepler wrote:
>> Hi everyone,
>> Please review and vote on the release candidate #2 for the version 3.0 of 
>> flink-shaded, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>> The complete staging area is available for your review, which includes:
>> * GitHub release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org [2], 
>> which are signed with the key with 
>> fingerprint19F2195E1B4816D765A2C324C2EED7B111D464BA [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag “release-3.0-rc2”[5].
>> * A complete list of all new commits in release-3.0-rc2, since release-2.0 
>> [6]
>> 
>> 
>> The vote will be open for at least 72 hours. It is adopted by majority 
>> approval, with at least 3 PMC affirmative votes.
>> 
>> Thanks,
>> Chesnay
>> [1] https://github.com/apache/flink-shaded/milestone/3?closed=1
>> [2] http://people.apache.org/~chesnay/flink-shaded-3.0-rc2/ 
>> 
>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [4] https://repository.apache.org/content/repositories/orgapacheflink-1147/
>> [5] 
>> https://gitbox.apache.org/repos/asf?p=flink-shaded.git;a=commit;h=fd0f8cc5b555bf17e0af7599c0976585da24cec3
>> [6]
>> fd0f8cc5b555bf17e0af7599c0976585da24cec3 (#38) Include missing javax 
>> validation-api dependency
>> 7b4fe16f8a8217771b39495cd8f14af86041c8f5 (#37) Include 
>> jackson-dataformat-yaml and snakeyaml
>> 1233f1bb0e2b9fafa4260603aa130b7eb9995a7a (#35) Bump netty to 4.0.56
>> eb064f6e24e98f3ae91bdfd91fc96a32c821620d (#31) Hard-code jackson-parent 
>> version
>> f7f3eca859b417a1dcb2aa4c53b1bd9076fce43e (#31) Add extended 
>> flink-shadaed-jackson-module-jsonSchema module
>> 36dc4840b3624fd83a0d00295013a933297801b7 (#32) Add flink-shaded-force-shading
>> 2b69aa648a7d5c689c1e3c2709d7e55cb765b0a2 (#28) Bump maven-shade-plugin 
>> version to 3.0.0
>> 0c4e83f87d36060c0a5c6c139e00b9afec4f5d19 Increment version to 3.0
>> 
>> 
>> 
>> 
> 



[jira] [Created] (FLINK-8717) Flink seems to deadlock due to buffer starvation when iterating

2018-02-20 Thread Romain Revol (JIRA)
Romain Revol created FLINK-8717:
---

 Summary: Flink seems to deadlock due to buffer starvation when 
iterating
 Key: FLINK-8717
 URL: https://issues.apache.org/jira/browse/FLINK-8717
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.4.0
 Environment: Windows 10 Pro 64-bit

Core i7-6820HQ @ 2.7 GHz

16GB RAM

Flink 1.4

Scala client

Scala 2.11.7

 
Reporter: Romain Revol
 Attachments: threadDump.txt

We are encountering what looks like a deadlock of Flink in one of our jobs with 
an "iterate" in it. 

I've reduced the job use case to the example in this gist : 
[https://gist.github.com/rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4]

Nothe that :
 * varying the parallelism affects the rapidity of occurence of the deadlock, 
but it always occur
 * varying MAX_LOOP_NB does affect the deadlock : the higher it is, the faster 
we encounter the deadlock. If MAX_LOOP_NB == 1, no deadlock. It consequently 
leads to think that it happens when the number of iterations reaches some 
threshold.

>From the [^threadDump.txt], it looks like some starvation over buffer 
>allocation, but I may be mistaking since I don't know we'll Flink internals.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8716) AvroSerializer does not use schema of snapshot

2018-02-20 Thread Arvid Heise (JIRA)
Arvid Heise created FLINK-8716:
--

 Summary: AvroSerializer does not use schema of snapshot
 Key: FLINK-8716
 URL: https://issues.apache.org/jira/browse/FLINK-8716
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
Reporter: Arvid Heise


The new AvroSerializer stores the schema in the snapshot and uses it to 
validate compability.

However, it does not use the schema of the snapshot while reading the data. 
This version will fail for any change of the data layout (so it supports more 
or less only renaming currently).

 
[https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L265]
 needs to use the schema from
 
[https://github.com/apache/flink/blob/f3a2197a23524048200ae2b4712d6ed833208124/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java#L188]
 as the first parameter. Accordingly, a readSchema field need to be set
 in #ensureCompatibility and relayed in #duplicate. Note that the readSchema is 
passed as the write schema parameter to the DatumReader, as it was the schema 
that was used to write the data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8715) RocksDB does propagate reconfiguration of serializer to the states

2018-02-20 Thread Arvid Heise (JIRA)
Arvid Heise created FLINK-8715:
--

 Summary: RocksDB does propagate reconfiguration of serializer to 
the states
 Key: FLINK-8715
 URL: https://issues.apache.org/jira/browse/FLINK-8715
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.3.2
Reporter: Arvid Heise


Any changes to the serializer done in #ensureCompability are lost during the 
state creation.

In particular, 
[https://github.com/apache/flink/blob/2f7392d77d85823d3db1f1e5a8d4f6c94358d773/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68]
 always uses a fresh copy of the StateDescriptor.

An easy fix is to pass the reconfigured serializer as an additional parameter 
in 
[https://github.com/apache/flink/blob/2f7392d77d85823d3db1f1e5a8d4f6c94358d773/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681]
 , which can be retrieved through the side-output of getColumnFamily

```
kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer()
```

I encountered it in 1.3.2 but the code in the master seems unchanged (hence the 
pointer into master).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8714) Suggest new users to use env.readTextFile method with 2 arguments (using the charset), not to rely on system charset (which varies across environments)

2018-02-20 Thread Michal Klempa (JIRA)
Michal Klempa created FLINK-8714:


 Summary: Suggest new users to use env.readTextFile method with 2 
arguments (using the charset), not to rely on system charset (which varies 
across environments)
 Key: FLINK-8714
 URL: https://issues.apache.org/jira/browse/FLINK-8714
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.0
Reporter: Michal Klempa


When a newcomer (like me), goes through the docs, there are several places 
where examples encourage to read the input data using the `env.readTextFile()` 
method.

 

This method variant does not take a second argument - character set (see 
[https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html#readTextFile-java.lang.String-).]
 This versoin relies (according to Javadoc) on " The file will be read with the 
system's default character set. "

 

This behavior is also default in Java, like in the 
`java.util.String.getBytes()` method, where not supplying the charset mean - 
use the system locale or the one which JVM was started with (see 
[https://stackoverflow.com/questions/64038/setting-java-locale-settings).] 
There are two ways to set locale prior to JVM start (-D arguments or set LC_ALL 
variable).

 

Given this is something a new Flink user may not know about, nor he wants to 
spend hours trying to find the environment-related bug (it works on localhost, 
but in production the locale is different), I would kindly suggest a change in 
documentation: lets migrate examples to use the two-argument version of 
`readTextFile(filePath, charsetName)`.

 

I am open to criticism and suggestions. The listing of readTextFile I was able 
to grep in docs is:

```

./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files 
that respect the `TextInputFormat` specification, line-by-line and returns them 
as Strings.
./dev/datastream_api.md:- `readTextFile(path)` - Reads text files, i.e. files 
that respect the `TextInputFormat` specification, line-by-line and returns them 
as Strings.
./dev/libs/storm_compatibility.md:DataStream text = 
env.readTextFile(localFilePath);
./dev/cluster_execution.md:    DataSet data = 
env.readTextFile("hdfs://path/to/file");
./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files 
line wise and returns them as Strings.
./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` - 
Reads files line wise and returns them as
./dev/batch/index.md:DataSet localLines = 
env.readTextFile("file:///path/to/my/textfile");
./dev/batch/index.md:DataSet hdfsLines = 
env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile");
./dev/batch/index.md:DataSet logs = 
env.readTextFile("file:///path/with.nested/files")
./dev/batch/index.md:- `readTextFile(path)` / `TextInputFormat` - Reads files 
line wise and returns them as Strings.
./dev/batch/index.md:- `readTextFileWithValue(path)` / `TextValueInputFormat` - 
Reads files line wise and returns them as
./dev/batch/index.md:val localLines = 
env.readTextFile("file:///path/to/my/textfile")
./dev/batch/index.md:val hdfsLines = 
env.readTextFile("hdfs://nnHost:nnPort/path/to/my/textfile")
./dev/batch/index.md:env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
./dev/batch/index.md:DataSet lines = env.readTextFile(pathToTextFile);
./dev/batch/index.md:val lines = env.readTextFile(pathToTextFile)
./dev/batch/examples.md:DataSet text = 
env.readTextFile("/path/to/file");
./dev/batch/examples.md:val text = env.readTextFile("/path/to/file")
./dev/api_concepts.md:DataStream text = 
env.readTextFile("file:///path/to/file");
./dev/api_concepts.md:val text: DataStream[String] = 
env.readTextFile("file:///path/to/file")
./dev/local_execution.md:    DataSet data = 
env.readTextFile("file:///path/to/file");
./ops/deployment/aws.md:env.readTextFile("s3:///");

```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8713) FileInputFormatTest.testGetStatisticsMultipleOneFileWithCachedVersion unstable on Travis

2018-02-20 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8713:


 Summary: 
FileInputFormatTest.testGetStatisticsMultipleOneFileWithCachedVersion unstable 
on Travis
 Key: FLINK-8713
 URL: https://issues.apache.org/jira/browse/FLINK-8713
 Project: Flink
  Issue Type: Bug
  Components: Batch Connectors and Input/Output Formats, Tests
Affects Versions: 1.5.0
Reporter: Till Rohrmann
 Fix For: 1.5.0


{{FileInputFormatTest.testGetStatisticsMultipleOneFileWithCachedVersion}} seems 
to be unstable on Travis.

 

https://api.travis-ci.org/v3/job/343717426/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8712) Cannot execute job with multiple slot sharing groups on LocalExecutor

2018-02-20 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8712:


 Summary: Cannot execute job with multiple slot sharing groups on 
LocalExecutor
 Key: FLINK-8712
 URL: https://issues.apache.org/jira/browse/FLINK-8712
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 1.5.0
Reporter: Till Rohrmann


Currently, it is not possible to run a job with multiple slot sharing groups on 
the LocalExecutor. The problem is that we determine the number of required 
slots simply by looking for the max parallelism of the job but do not consider 
slot sharing groups.

 
{code:java}
// set up the streaming execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

final DataStreamSource input = env.addSource(new InfinitySource());

final SingleOutputStreamOperator different = input.map(new 
MapFunction() {
   @Override
   public Integer map(Integer integer) throws Exception {
  return integer;
   }
}).slotSharingGroup("Different");

different.print();

// execute program
env.execute("Flink Streaming Java API Skeleton");{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)