[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6075
  
@zhangminglei  For your interest - there is a new Bucketing Sink in the 
Flink master (called `StreamingFileSink`), with a different design: Managing 
all state in Flink state (so it is consistent), with a new File System writer 
abstraction to generalize across HDFS, POSIX, and S3 (S3 still WIP) and with a 
more pluggable way to add encoders, like parquet and orc.

As an example, we added a Parquet writer, which is quite straightforward 
and flexible with the new interface.

Would be great to get your opinion on that and see if your ORC writer code 
also works with that.
If it works out, the new StreamingFileSink could replace the current 
BucketingSink.


---


[GitHub] flink issue #6353: [FLINK-9875][runtime] Add concurrent creation of executio...

2018-07-22 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6353
  
Maybe we can solve this simpler? Avoiding concurrency in the execution 
graph creation makes the code simpler and more robust - very desirable for an 
already fairly complex construct.

The issue here is the time it takes to create the splits, so we could see 
if we parallelize that, rather than parallelizing the job vertex creation.

I would think in the direction of having a Future that supplies the input 
splits and computing the future in the IOExecutor. That would parallelize the 
core problem and leave the executiongraph as it is.


---


[GitHub] flink pull request #:

2018-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/8231b62ff42aae53ca3a7b552980838ccab824ab#commitcomment-29792609
  
In 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java:
In 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationGroup.java
 on line 81:
Looks redundant, I agree.


---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
Nice feature, thanks a lot.

Merged this into the 1.6 and 1.7 branches


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r202556032
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
 ---
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.Writer;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.ResumableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.DateTimeBucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.writers.StringWriter;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
+ * integrated with the checkpointing mechanism to provide exactly once 
semantics.
+ *
+ *
+ * When creating the sink a {@code basePath} must be specified. The 
base directory contains
+ * one directory for every bucket. The bucket directories themselves 
contain several part files,
+ * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
+ * These part files contain the actual output data.
+ *
+ *
+ * The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
+ * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
+ * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
+ * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
+ * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
+ *
+ *
+ * The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
+ * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
+ * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
+ * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
+ * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
+ * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
+ * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
   

[GitHub] flink issue #6326: Mutual authentication for internal communication

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6326
  
Thanks for the review and for merging. @NicoK has an end-to-end test for 
SSL PR already (#6327) which would be great to rebase on top of this change.


---


[GitHub] flink issue #6326: Mutual authentication for internal communication

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6326
  
Pushed another commit that rebuilds the generated config docs


---


[GitHub] flink issue #6302: [FLINK-9061][checkpointing] add entropy to s3 path for be...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6302
  
Thanks for this contribution, that's a valuable fix. I have a few thoughts 
and suggestions on how we might improve the feature a bit still:

  - Can we get id of the `commons-text` dependency? The fewer dependencies, 
the fewer possible problems for users due to dependency clashes. It seems a bit 
heavy to add a new library for just one random string generation.

  - The feature is configured through additional constructor parameters. I 
am wondering if we may want to move this to the `Configuration`. That would 
allow the "ops side of things" to configure this for a setup (setting entropy 
key and checkpoints directory) without needing everyone that writes a Flink 
program to be aware of this.

  - If I read the code correctly, the code logs warnings for every file in 
case the feature is not activated. That will probably confuse a lot of users 
and make them dig into whether they have a wrong setup, when they simply don't 
use this new feature.



---


[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
Is the issue addressed here a bug?
If not, and if it seems that the original authors of the code had an 
intention of writing the code as it is now, I would suggest to leave it as it 
is.


---


[GitHub] flink issue #6321: [FLINK-9829] fix the wrapper classes be compared by symbo...

2018-07-15 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6321
  
This would be clearly not a hotfix. As per the pull request template, 
contributors should use hotfixes mainly for typos and JavaDoc updates.


---


[GitHub] flink issue #6326: Mutual authentication for internal communication

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6326
  
@EronWright Given our last discussion, I think this should be interesting 
to you. 


---


[GitHub] flink issue #6050: [FLINK-9404][flink-connector-filesystem] Adapter viewfs i...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6050
  
I think makes sense, merging...


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
@sihuazhou I got caught up in some other tasks - will try to get back to 
this here soon, I would like to have this feature in as a base for "search for 
completed checkpoint".


---


[GitHub] flink issue #6116: [FLINK-9498][build] Disable dependency convergence for fl...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6116
  
Is there a workaround for users to disable dependency convergence?

It is actually a problem that we don't control the convergence of some 
dependency that is used with varying versions (Hadoop), but rely on it for our 
own builds to succeed.

In the long run, we may want to work only "hadoop free", referencing for 
compilation-only our own shaded Hadoop (maybe even move it to flink-shaded) and 
let users export the Hadoop Classpath for their own Hadoop (rather than 
changing the dependency when building Flink).


---


[GitHub] flink issue #6281: [FLINK-9750] Add new StreamingFileSink with ResumableWrit...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6281
  
@xndai  The umbrella issue is 
[FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and some parts 
on the specifics of block formats (ORC / Parquet) are in 
[FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753)


---


[GitHub] flink issue #6300: [FLINK-9692][Kinesis Connector] Adaptive reads from Kines...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6300
  
I like the idea of this - should make it much easier to use.
Have you run this code on some heavier data stream to validate that it 
works well in practice?

 If yes, I would be +1 to this



---


[GitHub] flink issue #6324: [FLINK-9424] [security] Set default cipher suite to a mor...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6324
  
Thanks, merging...


---


[GitHub] flink issue #6328: [FLINK-9816][network] add option to configure SSL engine ...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6328
  
Could you rebase this on top of #6326 ? That PR makes sure SSLEngine 
factories are used everywhere, giving a single point to integrate the provider 
such that it is available for all SSL endpoints.


---


[GitHub] flink pull request #5966: [FLINK-9312] [security] Add mutual authentication ...

2018-07-13 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5966


---


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
This PR is subsumed by #6326


---


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
@EronWright Just saw this - I have concurrently reworked this PR into #6326 
 which does things more cleanly. I would like to get that PR in for 1.6 (got 
many users asking for this).

I would be happy if you want to build on top of that for the next steps...


---


[GitHub] flink issue #6327: [FLINK-9839][e2e] add a streaming allround test with SSL ...

2018-07-13 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6327
  
Could we save testing time by just activating SSL for existing test jobs?

Please also check the update of the SSL config keys that may come through 
#6326


---


[GitHub] flink pull request #6326: Mutual authentication for internal communication

2018-07-13 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6326

Mutual authentication for internal communication

** This is based on #6324 - hence the first commit in this PR should be 
discarded from review**

## What is the purpose of the change

Splits the SSL configuration into **internal communication** *(RPC, data 
transport, blob server)* and **external/REST** communication. Also activates 
mutual authentication for all internal communication.

This continues the security features of Flink as outlined in

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-26-SSL-Mutual-Authentication-td22188.html

Most of these changes are straightforward, the most important thing for 
reviewers to in my opinion would be to check whether the configuration keys 
make sense:

  - One can configure SSL independently for internal and external/REST 
communication
  - This is due to feedback from users that internal communication needs to 
be protected more and by Flink itself, while external communication is 
frequently protected by REST proxies (often as side car processes to the 
JobManager / Dispatcher)
  - All keytore and password settings now exist additionally in the 
`security.ssl.internal.*` and `security.ssl.rest.*` key namespace. The 
`security.ssl.*` config keys still exist and used if the more specific key is 
not set. This is meant both for backwards compatibility, and to make it easy to 
use a uniform config across internal/external communication.

## Brief change log

  - Introduces new config option families: `security.ssl.internal.*` and 
`security.ssl.rest.*`
  - Adds code to fall back to the `security.ssl.*` keys if no internal or 
rest specific options are set
  - Refactors all instantiation of `SSLEngine` and `SSL(Server)Socket` to 
go through factories. That way, the different endpoint instantiations do not 
need to apply configurations themselves.
  - Activates mutual auth for akka/rpc via akka config, plus adds a test
  - Activates mutual auth in the SSL Socket/Engine factories (netty / blob) 
and adds a test

## Verifying this change

  - There are additional unit tests checking that clients with untrusted 
certificates cannot connect.
  - Verifying end-to-end works by building the code, enabling internal SSL 
in the flink-conf.yaml, starting a standalone cluster, checking the logs and 
akka urls for SSL entries

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? Docs coming in a separate PR 
once we have agreement on the config keys


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink client_auth

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6326


commit 37abf46f6030b6404707958e5a3a3fae0051dbea
Author: Stephan Ewen 
Date:   2018-07-13T07:31:18Z

[FLINK-9424] [security] Set default cipher suite to a more compatible 
cipher suite.

The upgraded ciphers are not yet supported on all platforms and JDK 
versions, making
the getting-started process rough. Instead, we document our recommendation 
to set these
values in the configuration.

This reverts "[FLINK-9310] [security] Update standard cipher suites for 
secure mode"

commit 0d33c8ab2be6502a56d8ea97a72fda5ec8b865c0
Author: Stephan Ewen 
Date:   2018-07-12T09:28:57Z

[FLINK-9313] [security] (part 1) Instantiate all SSLSocket and 
SSLServerSocket through factories.

This removes hostname verification from SSL client sockets.
With client authentication, this is no longer needed and it is not 
compatible with
various container environments.

commit 80cd8bec111bb91943bd691adf80275c79b57ca0
Author: Stephan Ewen 
Date:   2018-05-07T17:44:33Z

[FLINK-9313] [security] (part 3) Activate mutual authentication for RPC/akka

commit 97425b2962861922ac3d7e64fb57400de787966d
Author: Stephan Ewen 
Date:   2018-07-12T15:20:30Z

[FLINK-9313] [security] (part 2) Split SSL configuration into internal 
(rpc, data transport, b

[GitHub] flink pull request #6324: [FLINK-9424] [security] Set default cipher suite t...

2018-07-13 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6324

[FLINK-9424] [security] Set default cipher suite to a more compatible 
cipher suite

## What is the purpose of the change

This reverts "[FLINK-9310] [security] Update standard cipher suites for 
secure mode"

The upgraded ciphers are not yet supported on all platforms and JDK 
versions, making
the getting-started process rough. Instead, we document our recommendation 
to set these
values in the configuration.

## Brief change log

  - Reverts "[FLINK-9310] [security] Update standard cipher suites for 
secure mode"
  - Add docs to manually configure the stronger cipher suites

## Documentation

Adds a section to the SSL docs.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink downgrade_ciphers

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6324


commit 37abf46f6030b6404707958e5a3a3fae0051dbea
Author: Stephan Ewen 
Date:   2018-07-13T07:31:18Z

[FLINK-9424] [security] Set default cipher suite to a more compatible 
cipher suite.

The upgraded ciphers are not yet supported on all platforms and JDK 
versions, making
the getting-started process rough. Instead, we document our recommendation 
to set these
values in the configuration.

This reverts "[FLINK-9310] [security] Update standard cipher suites for 
secure mode"




---


[GitHub] flink pull request #6304: [FLINK-9801][build] Add missing example dependenci...

2018-07-12 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6304#discussion_r201937327
  
--- Diff: flink-dist/pom.xml ---
@@ -140,6 +140,22 @@ under the License.


 
+   
--- End diff --

There is a section further down on the POM, with a comment "dependencies 
for "/examples" in scope provided. How about moving these entries there, then 
we get comments for "why it is like that" for free...


---


[GitHub] flink issue #6309: [FLINK-9809] [DataSteam API] Allow setting co-location co...

2018-07-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6309
  
Thanks, merging once CI is green...


---


[GitHub] flink pull request #6309: [FLINK-9809] [DataSteam API] Allow setting co-loca...

2018-07-11 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6309

[FLINK-9809] [DataSteam API] Allow setting co-location constraints on 
StreamTransformations

## What is the purpose of the change

Flink supports co location constraints for operator placement during 
scheduling. This is used internally for iterations, for example, but is not 
exposed to users.

This PR adds a way for expert users to set these constraints. As a first 
step, it adds them to the `StreamTransformation`, which is not part of the 
public user-facing classes, but a more internal class in the DataStream API. 
That way we make this initially a hidden feature and can gradually expose it 
more prominently when we agree that this would be a good idea.

You can use them as follows:
```java
DataStream stream = ...
stream.getTransformation().setCoLocationGroupKey("group2");
```

## Verifying this change

  - You can test setting the constraints as in the example above.
  - The unit test `StreamGraphCoLocationConstraintTest` adds further tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? A *hidden* feature.
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink colocation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6309.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6309


commit bd58f39290842e9088dfdb997b0704ada7bd79c8
Author: Stephan Ewen 
Date:   2018-07-11T15:48:10Z

[FLINK-9809] [DataSteam API] Allow setting co-location constraints on 
StreamTransformations.

This feature is currently only exposed on StreamTransformations (internal 
API) rather
than in the public API, because it is a hidden expert feature.




---


[GitHub] flink issue #6305: [FLINK-9807][tests] Optimize EventTimeWindowCheckpointITC...

2018-07-11 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6305
  
Please update the description so that reviewers can take a look.


---


[GitHub] flink issue #6275: [FLINK-9776] [runtime] Stop sending periodic interrupts o...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6275
  
All right, thanks, merging!


---


[GitHub] flink issue #6290: [Flink-9691] [Kinesis Connector] Attempt to call getRecor...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6290
  
Thanks, merging this...


---


[GitHub] flink issue #6286: [FLINK-9754][release] Remove references to scala profiles

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6286
  
+1


---


[GitHub] flink issue #6285: [FLINK-9768][release] Speed up binary release

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6285
  
+1


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201374783
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemResumableWriterTest.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs.local;
+
+import org.apache.flink.core.fs.AbstractResumableWriterTest;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Tests for the {@link LocalResumableWriter}.
+ */
+public class LocalFileSystemResumableWriterTest extends 
AbstractResumableWriterTest {
+
+   @ClassRule
+   public static TemporaryFolder tempFolder = new TemporaryFolder();
--- End diff --

It is good practice to make these final


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201400623
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+   private static final Random RND = new Random();
+
+   private static final String testData1 = "THIS IS A TEST 1.";
+   private static final String testData2 = "THIS IS A TEST 2.";
+   private static final String testData3 = "THIS IS A TEST 3.";
+
+   private Path basePathForTest;
+
+   private static FileSystem fileSystem;
+
+   public abstract Path getBasePath() throws Exception;
+
+   public abstract FileSystem initializeFileSystem();
+
+   public Path getBasePathForTest() {
+   return basePathForTest;
+   }
+
+   private FileSystem getFileSystem() {
+   if (fileSystem == null) {
+   fileSystem = initializeFileSystem();
+   }
+   return fileSystem;
+   }
+
+   private ResumableWriter getNewFileSystemWriter() throws IOException {
+   return getFileSystem().createRecoverableWriter();
+   }
+
+   @Before
+   public void prepare() throws Exception {
+   basePathForTest = new Path(getBasePath(), randomName());
+   getFileSystem().mkdirs(basePathForTest);
+   }
+
+   @After
+   public void cleanup() throws Exception {
+   getFileSystem().delete(basePathForTest, true);
+   }
+
+   @Test
+   public void testCloseWithNoData() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir + File.separator + "part-0");
--- End diff --

Avoid `File.separator` for cross platform path, use `new Path(testDir, 
"part-0");`.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201369555
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
return recoverable;
}
}
+
+   /**
+* Called when resuming execution after a failure and waits until the 
lease
+* of the file we are resuming is free.
+*
+* The lease of the file we are resuming writing/committing to may 
still
+* belong to the process that failed previously and whose state we are
+* recovering.
+*
+* @param path The path to the file we want to resume writing to.
+*/
+   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
+   Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+   final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+   dfs.recoverLease(path);
+   boolean isclosed = dfs.isFileClosed(path);
+
+   final StopWatch sw = new StopWatch();
--- End diff --

Let's use `Deadline` from the Flink utils instead to reduce external 
dependencies.


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201374633
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
return recoverable;
}
}
+
+   /**
+* Called when resuming execution after a failure and waits until the 
lease
+* of the file we are resuming is free.
+*
+* The lease of the file we are resuming writing/committing to may 
still
+* belong to the process that failed previously and whose state we are
+* recovering.
+*
+* @param path The path to the file we want to resume writing to.
+*/
+   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
+   Preconditions.checkState(fs instanceof DistributedFileSystem);
+
+   final DistributedFileSystem dfs = (DistributedFileSystem) fs;
+   dfs.recoverLease(path);
+   boolean isclosed = dfs.isFileClosed(path);
+
+   final StopWatch sw = new StopWatch();
+   sw.start();
+
+   while (!isclosed) {
+   if (sw.getTime() > LEASE_TIMEOUT) {
+   break;
+   }
+
+   try {
--- End diff --

This basically locks the thread in for up to LEASE_TIMEOUT time, making it 
not possible to cancel. I would either propagate the InterruptedException, or 
rethrow it as an IOException indicating that recovering the lease failed 
(because this is a single-purpose util function that works here).


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201375290
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 ---
@@ -130,7 +130,7 @@ public static boolean hasHDFSDelegationToken() throws 
Exception {
 */
public static boolean isMinHadoopVersion(int major, int minor) throws 
FlinkRuntimeException {
String versionString = VersionInfo.getVersion();
-   String[] versionParts = versionString.split(".");
+   String[] versionParts = versionString.split("\\.");
--- End diff --

Good catch!


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201401033
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/AbstractResumableWriterTest.java
 ---
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.fail;
+
+public abstract class AbstractResumableWriterTest extends TestLogger {
+
+   private static final Random RND = new Random();
+
+   private static final String testData1 = "THIS IS A TEST 1.";
+   private static final String testData2 = "THIS IS A TEST 2.";
+   private static final String testData3 = "THIS IS A TEST 3.";
+
+   private Path basePathForTest;
+
+   private static FileSystem fileSystem;
+
+   public abstract Path getBasePath() throws Exception;
+
+   public abstract FileSystem initializeFileSystem();
+
+   public Path getBasePathForTest() {
+   return basePathForTest;
+   }
+
+   private FileSystem getFileSystem() {
+   if (fileSystem == null) {
+   fileSystem = initializeFileSystem();
+   }
+   return fileSystem;
+   }
+
+   private ResumableWriter getNewFileSystemWriter() throws IOException {
+   return getFileSystem().createRecoverableWriter();
+   }
+
+   @Before
+   public void prepare() throws Exception {
+   basePathForTest = new Path(getBasePath(), randomName());
+   getFileSystem().mkdirs(basePathForTest);
+   }
+
+   @After
+   public void cleanup() throws Exception {
+   getFileSystem().delete(basePathForTest, true);
+   }
+
+   @Test
+   public void testCloseWithNoData() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir + File.separator + "part-0");
+
+   final RecoverableFsDataOutputStream stream = writer.open(path);
+   for (Map.Entry fileContents : 
getFileContentByPath(testDir).entrySet()) {
+   
Assert.assertTrue(fileContents.getKey().getName().startsWith(".part-0.inprogress."));
+   Assert.assertTrue(fileContents.getValue().isEmpty());
+   }
+
+   stream.closeForCommit().commit();
+
+   for (Map.Entry fileContents : 
getFileContentByPath(testDir).entrySet()) {
+   Assert.assertEquals("part-0", 
fileContents.getKey().getName());
+   Assert.assertTrue(fileContents.getValue().isEmpty());
+   }
+   }
+
+   @Test
+   public void testCommitAfterNormalClose() throws Exception {
+   final ResumableWriter writer = getNewFileSystemWriter();
+
+   final Path testDir = getBasePathForTest();
+
+   final Path path = new Path(testDir.getPath() + File.separator + 
"part-0");
+
+   try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+   
stream.write(testData1.getBytes(Charset.forName("UTF-8")));
--- End diff --

Use `StandardCharsets.UTF_8` instead of "UTF-8".


---


[GitHub] flink pull request #6281: [FLINK-9750] Add new StreamingFileSink with Resuma...

2018-07-10 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6281#discussion_r201375170
  
--- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
@@ -41,6 +44,8 @@
 @Internal
 class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream {
 
+   private static final long LEASE_TIMEOUT = 10L;
--- End diff --

Can we add digit grouping chars here? Makes it easier to read...


---


[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...

2018-07-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6275#discussion_r200814568
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---
@@ -1563,7 +1573,7 @@ public void run() {
 
// log stack trace where the executing thread 
is stuck and
// interrupt the running thread periodically 
while it is still alive
-   while (executerThread.isAlive()) {
+   while (task.shouldInterruptOnCancel() && 
executerThread.isAlive()) {
--- End diff --

True, this is no 100% guarantee that interrupts do not come. That would 
need an atomic "interrupt if flag is set call", but I don't know if that is 
possible in Java without introducing a locked code block, which I wanted to 
avoid.

It may also not be necessary. I think the variant here is already strictly 
better than the current state, which is correct already. The current state 
mainly suffers from shutdowns "looking rough" due to interruptions.

This change should the majority of that, because in the vast majority of 
shutdowns, the thread exits before the first of the "repeated interrupts". The 
thread only experiences the initial interrupt.

In some sense, only clearing the initial interrupt flag would probably help 
> 90% of the cases already. This solves a few more % of the cases by guarding 
the repeated interrupts.


---


[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6075
  
The dependencies of the `flink-connector-filesystem` are not well set up 
already, having an Avro dependency and a Hadoop dependency. I agree that it 
would be good to not introduce yet more dependencies, or at the very least, 
make them optional dependencies.

FYI: In the re-work of the BucketingSink under 
[FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749), we want to 
introduce a `BulkEncoder` interface in `flink-core` that is used by the 
BucketingSink, and can be implemented by classes in `flink-orc` (and later a 
new `flink-parquet` project). That way we cleanly separate dependencies of the 
projects.


---


[GitHub] flink issue #6235: [FLINK-9377] [core] Remove serializers from checkpointed ...

2018-07-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6235
  
Took a look at this WIP and I think it goes into a good direction.

My most important comment is that I think it would help to move the 
"ensureCompatibility" into the config snapshot, for the following reasons:
  - Clearer separation of concerns, the serializer has only the 
serialization logic, and creating the snapshot. Compatibility is not the 
serializers immediate concern.
  - The current design means that the serializer mutates internal fields on 
reconfiguration. That is error prone. Consider a serializer like the 
KryoSerializer, where the configuration is not fully deep copied on duplication 
(makes sense, it is read only during serialization). Mutating that 
configuration would change the behavior of other previously duplicated 
serializers as well, which is unexpected.

Thoughts for improvements with lower priority:

  - Can we avoid setting the ClassLoader into a field in the config 
snapshot, and then deserializing? I think such solutions are fragile and should 
be avoided if possible. The ClassLoader is not really part of the snapshots 
state, it is an auxiliary to the deserialization and should, as such, be passed 
as an argument to the read method: read(in, classloader). This means that the 
TypeSerializerConfigSnapshot would not implement `IOReadableWritable`, but that 
might be not a problem.

  - Is the TypeSerializerConfigSnapshotSerializationProxy needed? It seems 
like an unnecessary indirection given that it is used exclusively in the 
TypeSerializerSerializationUtil and could be a static util method instead.



---


[GitHub] flink pull request #6275: [FLINK-9776] [runtime] Stop sending periodic inter...

2018-07-06 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6275

[FLINK-9776] [runtime] Stop sending periodic interrupts once executing 
thread leaves user function / operator code.

## What is the purpose of the change

Upon cancellation, the task thread is periodically interrupted. This helps 
to pull the thread out of blocking operations in the user code.

However, once the thread leaves the user code, the repeated interrupts may 
interfere with the shutdown cleanup logic, causing confusing exceptions.

This PR changes the behavior to stop sending the periodic interrupts once 
the thread leaves the user code.

## Brief change log

  - `AbstractInvokable` maintains a flag whether interrupts should be sent.
  - `StreamTask` sets to not receive interrupts after coming out of the 
user code

## Verifying this change

This change is a trivial rework that currently only avoids  throwing and 
catching of InterruptedExceptions that may cause noise in the logs.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink stop_interrupts

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6275


commit 73d31551574f3c18e4cbc079681ed93f9ec2ef34
Author: Stephan Ewen 
Date:   2018-07-06T11:34:27Z

[FLINK-9776] [runtime] Stop sending periodic interrupts once executing 
thread leaves user function / operator code.




---


[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6075
  
If it is not a problem that this can lead to poor compression when 
checkpoint intervals are short, we could think about merging this as a 
temporary solution until 
[FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) is fully 
developed.

I would suggest to make the ORC and table dependencies optional, though, 
that not every user of the BucketingSink needs to have these dependencies.


---


[GitHub] flink issue #6075: [FLINK-9407] [hdfs connector] Support orc rolling sink wr...

2018-07-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6075
  
Hi @zhangminglei 
Sorry for the late response - I thought about this solution quite a bit and 
came to the conclusion that we may need to do a bit more for efficient results:

Please take a look at 
[FLINK-9749](https://issues.apache.org/jira/browse/FLINK-9749) and the subtask 
[FLINK-9753](https://issues.apache.org/jira/browse/FLINK-9753)
The description outlines why I believe the simple approach suggested here 
may not be enough (will frequently result in badly compressed ORC/Parquet).

We have already started this effort to completely redesign the 
BucketingSink. The initial work-in-progress looks quite promising.


---


[GitHub] flink issue #6118: [FLINK-9525][filesystem] Add missing `META-INF/services/*...

2018-06-06 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6118
  
Where in the loading of the factories do you see the error?
My suspicion is still an issue with inverted class loading.

To confirm, can we check the following?
  - Are you running this on Flink 1.4.0 or 1.4.1?
  - Do you have `hadoop-common` in the job's jar, or in the `flink/lib` 
folder?
  - Does the error go away if you set "classloader.resolve-order: 
parent-first" in the config?


---


[GitHub] flink issue #6118: [FLINK-9525][filesystem] Add missing `META-INF/services/*...

2018-06-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6118
  
I think this is a misdiagnosis, this should not be merged.

Flink does not need a file system factory for Hadoop, it uses Hadoop's FS 
as the general fallback for all schemes that it does not have a factory for.

The exception in the linked JIRA comes from Hadoop's own File System 
discovery. There is probably some casting error or so (may be due to inverted 
classloading).


---


[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

2018-06-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6103
  
That all depends why the failure happens in the first place. It seems to 
happen if the receiver of a channel starts much faster than the sender. The 
longest part of the deployment is library distribution, which happens only 
once. After one failure / recovery, the library should be cached and the next 
attempt to start the task should be very fast.


---


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-05 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5448
  
There is no problem reusing old keys, if their default unit was "bytes", 
because the `MemorySize.parse(...)` interprets a number as bytes, if there is 
no unit attached to it.

I did not realize that you switched the config keys already - in that case 
we need to backwards support the old keys as well. Also, we need to update all 
the shell scripts (`config.sh`, `jobmanager.sh`, `taskmanager.sh` and so on) to 
be consistent with the new config keys.




---


[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6108
  
@kl0u please link the issue once you created it.

This is currently very early, in design discussions between @kl0u, me, and 
@aljoscha.
The main points about the rewrite are
  - Use Flink's FileSystem abstraction, to make it work with shaded S3, 
swift, etc and give an easier interface
  - Add a proper "ChunkedWriter" abstraction to the FileSystems, which 
handles write, persist-on-checkpoint, and rollback-to-checkpoint in a 
FileSystem specific way. For example, use truncate()/append() on POSIX and 
HDFS, use MultiPartUploads on S3, ...
  - Add support for gathering large chunks across checkpoints, to make 
Parquet and ORC compression more effective.


---


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5448
  
Okay, after taking a look, I think we need to add a few changes:

  - We need to add an additional `MemoryUnit.parse()` method that takes the 
"default" unit, so that we parse the old heap sizes such that they are in MB if 
nothing else is specified.

  - We should either change the return value of `getMebiBytes()` to `int` 
or have a `getMebiBytesAsInt()` method that uses a 
`MathUtils.checkedDownCast()` to avoid unnoticed overflow errors.

Open question: As we are changing the value type of the heap size config 
options, should we deprecate the current config keys and introduce new ones 
(like `jobmanager.heap-size`)?



---


[GitHub] flink issue #6094: [FLINK-9468][filesystem] fix calculate outputLimit incorr...

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6094
  
Good catch. Apparently the tests don't check all combinations of settings, 
otherwise this would have shown up.

Merging this...


---


[GitHub] flink issue #6103: [FLINK-9413] [distributed coordination] Tasks can fail wi...

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6103
  
How critical is it to change this setting? 
I would assume this should be caught by the regular recovery, so unless 
this occurs very often and thus leads to confusing exceptions in the log, 
should we maybe leave it as it is?


---


[GitHub] flink issue #6108: [FLINK-9367] [Streaming Connectors] Allow to do truncate(...

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6108
  
Do you have a Hadoop version older than  2.7?

We are currently attempting to rewrite the Bucketing Sink completely for 
better compatibility with S3 and with better support for Parquet / ORC. We were 
actually thinking to drop support for file systems that do not support 
`truncate()` - so getting this feedback would be good.


---


[GitHub] flink issue #6111: [FLINK-9504]Change the log level of checkpoint duration t...

2018-06-04 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6111
  
I have often wondered about this as well. There sure are a lot of log lines.

  - The state backends themselves should probably log on `debug` level, 
otherwise there is a line per operator in the task
  - The task itself should still log on info level, so that one has 
information to analyze checkpoints from the logs


---


[GitHub] flink issue #5448: [FLINK-6469] Configure Memory Sizes with units

2018-05-30 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5448
  
Will try and take a look at this soon... Sorry for the delay.

What I would consider very important is that users who don't change their 
configuration do not get different behavior all of a sudden. Meaning in the 
absence of a "unit" we do not always interpret the value as a "byte" but as 
whatever the config value was measured in before (such as MBs, ...).


---


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-28 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5982
  
I think we need to have a special output stream type 
(`AtomicCreatingFsDataOutputStream` or similar) as the return type of 
`FileSystem.createAtomic()`. Otherwise, how can a user actually create a file? 
The `closeAndPublish()` method is not part of any API class.


---


[GitHub] flink issue #6073: [FLINK-9091] [table] Fix dependency convergence for flink...

2018-05-25 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6073
  
DependencyManagement in the root pom should cover dependencies that we 
share and expose across modules.

Enforcing convergence with one module (`flink-table`) for a dependency that 
is hidden (shaded) should be handled in that module, not clutter the 
project-global configuration.


---


[GitHub] flink pull request #6073: [FLINK-9091] [table] Fix dependency convergence fo...

2018-05-25 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6073#discussion_r190855869
  
--- Diff: flink-libraries/flink-table/pom.xml ---
@@ -146,6 +147,12 @@ under the License.

flink-test-utils_${scala.binary.version}
${project.version}
test
+   
--- End diff --

Why does `flink-test-utils` even have a Guava dependency? Can we fix that?


---


[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5843
  
One could add an int to the annotation, as "priority / position" and sort 
by that.
Not sure nice, but could be okay.

I think it was nice for users that the most common options (the ones you 
need first) were at the top of the list.

Out of curiosity, what happens to options like `env.java.opts` which are 
shell script only options, but very common?




---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6043
  
@cjolif I agree, let's do something here.

@tzulitai what do you think about trying to use the switch to REST to make 
a clean cut and start a new connector project (without dependency on 
`flink-connector-elasticsearch-base`). As an experiment, we could try how much 
code we would actually need to copy into the new project.

@aljoscha and @patricklucas I remember you also had some thoughts on the 
elasticsearch connectors.

I am +1 for seeing if we can drop ElasticSearch 1.x and 2.x support, but 
that should be a separate thread.


---


[GitHub] flink issue #5963: [FLINK-9305][s3] also register flink-s3-fs-hadoop's facto...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5963
  
Merging this...


---


[GitHub] flink issue #5954: [FLINK-9276] Improve error message when TaskManager fails

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5954
  
This looks fine from my side.

Would like to get a +1 from @tillrohrmann before merging this...


---


[GitHub] flink issue #5891: [FLINK-9088][nifi-connector][build] Bump nifi-site-to-sit...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5891
  
All new dependencies are okay with the Apache License (not for shading, 
though!)

Because nothing gets shaded here and we left dependency management to the 
user, this upgrade is okay.

Merging this...


---


[GitHub] flink issue #5891: [FLINK-9088][nifi-connector][build] Bump nifi-site-to-sit...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5891
  
To get such dependency upgrade PRs merged, it is useful to always add the 
relevant parts of the dependency tree after the update:

Old:
```
[INFO] org.apache.flink:flink-connector-nifi_2.11:jar:1.6-SNAPSHOT
[INFO] +- org.apache.nifi:nifi-site-to-site-client:jar:0.6.1:compile
[INFO] |  +- org.apache.nifi:nifi-api:jar:0.6.1:compile
[INFO] |  +- org.apache.nifi:nifi-utils:jar:0.6.1:compile
[INFO] |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile
[INFO] |  |  \- org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile
[INFO] |  \- org.apache.nifi:nifi-client-dto:jar:0.6.1:compile
[INFO] | \- com.wordnik:swagger-annotations:jar:1.5.3-M1:compile
```

New:
```
[INFO] +- org.apache.nifi:nifi-site-to-site-client:jar:1.6.0:compile
[INFO] |  +- org.apache.nifi:nifi-api:jar:1.6.0:compile
[INFO] |  +- org.apache.nifi:nifi-framework-api:jar:1.6.0:compile
[INFO] |  +- org.apache.nifi:nifi-utils:jar:1.6.0:compile
[INFO] |  +- org.apache.nifi:nifi-security-utils:jar:1.6.0:compile
[INFO] |  |  +- commons-codec:commons-codec:jar:1.10:compile
[INFO] |  |  +- org.bouncycastle:bcprov-jdk15on:jar:1.59:compile
[INFO] |  |  +- org.bouncycastle:bcpkix-jdk15on:jar:1.59:compile
[INFO] |  |  \- org.apache.nifi:nifi-properties:jar:1.6.0:compile
[INFO] |  +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile
[INFO] |  |  +- 
com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile
[INFO] |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile
[INFO] |  +- org.apache.nifi:nifi-client-dto:jar:1.6.0:compile
[INFO] |  |  +- io.swagger:swagger-annotations:jar:1.5.16:compile
[INFO] |  |  \- 
org.apache.nifi.registry:nifi-registry-data-model:jar:0.1.0:compile
[INFO] |  | +- javax.validation:validation-api:jar:2.0.0.Final:compile
[INFO] |  | \- javax.ws.rs:javax.ws.rs-api:jar:2.1:compile
[INFO] |  +- org.apache.httpcomponents:httpclient:jar:4.5.3:compile
[INFO] |  |  +- org.apache.httpcomponents:httpcore:jar:4.4.6:compile
[INFO] |  |  \- commons-logging:commons-logging:jar:1.1.3:compile
[INFO] |  \- org.apache.httpcomponents:httpasyncclient:jar:4.1.3:compile
[INFO] | \- org.apache.httpcomponents:httpcore-nio:jar:4.4.6:compile
```


---


[GitHub] flink issue #5948: [FLINK-9286][docs] Update classloading docs

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5948
  
Thanks. Will merge this, possibly add one more sentence in the process...


---


[GitHub] flink issue #5843: [FLINK-8744][docs] Generate "Common Option" section

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5843
  
Looks pretty good.

Is there a way we can "sort" the common options? Something like
  - host:port (for standalone setups)
  - java memory
  - default parallelism / slots
  - fault tolerance
  - HA
  - security


---


[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5857
  
I think this is a nice addition. Basically turns the prometheus "pull 
model" into a "push model".

@lamber-ken Can you check that the new dependency is correctly shaded?

@zentol Do you think this is good in the same project as the prometheus 
reporter, or should this be in a separate project?


---


[GitHub] flink issue #6015: [FLINK-8933] Avoid calling Class#newInstance(part 1)

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6015
  
Similar as to #6016: Should we close this PR until we have consensus 
whether we want to change this?

Especially the performance implications in methods/classes on the "hot code 
paths" makes this a tricky change...


---


[GitHub] flink issue #6016: [FLINK-8933] Avoid calling Class#newInstance(part 2)

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6016
  
Should we close this PR until we have consensus whether we want to change 
this?

Especially the performance implications in methods/classes on the "hot code 
paths" makes this a tricky change...


---


[GitHub] flink issue #6043: [FLINK-7386] evolve RequestIndexer API to make it working...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6043
  
As a high-level comment, I think we may want start making the ElasticSearch 
connectors projects independent of each other.

We previously tried to share code between versions, which has made things 
clumsy both from the dependency management and the implementation (api bridges, 
etc.). It also couples different versions, such that a bug fix in one connector 
version often affects other connectors as well.

The REST-based client may be a good time to start clean, create a new 
project with no dependencies on the base connector project, and copy the 
necessary code over.

What do you think?


---


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/6066


---


[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Closed in e1d1234477c731fe3f398c7f3f12123f73764242


---


[GitHub] flink issue #6066: [FLINK-9428] [checkpointing] Allow operators to flush dat...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6066
  
Thanks for the review.

Addressing the comments and merging this...


---


[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6071#discussion_r190554149
  
--- Diff: pom.xml ---
@@ -308,7 +308,7 @@ under the License.
errors.
 
[1] https://github.com/netty/netty/issues/3704 
-->
--- End diff --

Looks like you can remove this comment now.


---


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6066#discussion_r190543815
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
+import 
org.apache.flink.streaming.runtime.operators.StreamOperatorChainingTest;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.BroadcastingOutputCollector;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.ChainingOutput;
+import 
org.apache.flink.streaming.runtime.tasks.OperatorChain.WatermarkGaugeExposingOutput;
+
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class test the {@link OperatorChain}.
+ *
+ * It takes a different (simpler) approach at testing the operator 
chain than
+ * {@link StreamOperatorChainingTest}.
+ */
+public class OperatorChainTest {
+
+   @Test
+   public void testPrepareCheckpointPreBarrier() throws Exception {
+   final AtomicInteger intRef = new AtomicInteger();
+
+   final OneInputStreamOperator<String, String> one = new 
ValidatingOperator(intRef, 0);
+   final OneInputStreamOperator<String, String> two = new 
ValidatingOperator(intRef, 1);
+   final OneInputStreamOperator<String, String> three = new 
ValidatingOperator(intRef, 2);
+
+   final OperatorChain chain = setupOperatorChain(one, two, 
three);
+   
chain.prepareSnapshotPreBarrier(ValidatingOperator.CHECKPOINT_ID);
+
+   assertEquals(3, intRef.get());
+   }
+
+   // 

+   //  Operator Chain Setup Utils
+   // 

+
+   @SafeVarargs
+   private static <T, OP extends StreamOperator> OperatorChain<T, OP> 
setupOperatorChain(
--- End diff --

This is a lot of mocking, but the alternative approach ties itself not only 
to the internals of the `OperatorChain`, but also to the stream config 
specifics. In that sense, I would like to keep this, because it at least ties 
itself to details one component, rather than two components.

This hints that OperatorChain could really use come refactoring.


---


[GitHub] flink pull request #6066: [FLINK-9428] [checkpointing] Allow operators to fl...

2018-05-23 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/6066

[FLINK-9428] [checkpointing] Allow operators to flush data on checkpoint 
pre-barrier

## What is the purpose of the change

Some operators maintain some small transient state that may be inefficient 
to checkpoint, especially when it would need to be checkpointed also in a 
re-scalable way.
An example are opportunistic pre-aggregation operators, which have small 
the pre-aggregation state that is frequently flushed downstream.

Rather that persisting that state in a checkpoint, it can make sense to 
flush the data downstream upon a checkpoint, to let it be part of the 
downstream operator's state.

This feature is sensitive, because flushing state has a clean implication 
on the downstream operator's checkpoint alignment. However, used with care, and 
with the new back-pressure-based checkpoint alignment, this feature can be very 
useful.

Because it is sensitive, this PR makes this an internal feature (accessible 
to operators) and does NOT expose it in the public API.

## Brief change log

  - Adds the `prepareSnapshotPreBarrier(long checkpointId)` call to 
`(Abstract)StreamOperator`, with an empty default implementation.
  - Adds a call on `OperatorChain` to call this in front-to-back order on 
the operators.

## Verifying this change

  - This change does not yet alter any behavior, it adds only a plug point 
for future stream operators.
  - The `OperatorChainTest` Unit Test validates that the call happens, and 
that operators are called in the right order.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **yes**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink pre_barrier

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6066.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6066






---


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Looks good, thanks!

+1 to merge this


---


[GitHub] flink issue #5996: [FLINK-9343] [Example] Add Async Example with External Re...

2018-05-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5996
  
Async I/O works best with asynchronous clients. For synchronous client, you 
need a threadpool or something else to concurrently fire off requests.


---


[GitHub] flink issue #5996: [FLINK-9343] [Example] Add Async Example with External Re...

2018-05-23 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5996
  
I had a quick look at the code example, and it looks like it might not 
actually do asynchronous I/O.
It dispatches a synchronous HTTP request on a direct executor 
(`onComplete`s in a direct executor as well), which should result us purely 
synchronous operations.

Have you verified that this does in fact send off multiple requests 
concurrently (beyond the parallelism)? 


---


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
Added a few more comment, most importantly around exception wrapping.
Otherwise, looking good...


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197766
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
+   return new AvroDeserializationSchema<>(GenericRecord.class, 
schema);
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces classes that 
were generated from avro schema.
+*
+* @param tClass class of record to be produced
+* @return deserialized record
+*/
+   public static  AvroDeserializationSchema 
forSpecific(Class 

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189197633
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.reader = reader;
+   if (reader != null) {
+   this.schemaString = reader.toString();
+   } else {
+   this.schemaString = null;
+   }
+   }
+
+   /**
+* Creates {@link AvroDeserializationSchema} that produces {@link 
GenericRecord} using provided schema.
+*
+* @param schema schema of produced records
+* @return deserialized record in form of {@link GenericRecord}
+*/
+   public static AvroDeserializationSchema 
forGeneric(Schema schema) {
--- End diff --

Minor comment: I found it helps code structure/readability to move 
static/factory methods either to the top or the bottom of the class.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189195014
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using 
{@link SchemaCoder}.
+ *
+ * @param  type of record it produces
+ */
+public class RegistryAvroDeserializationSchema extends 
AvroDeserializationSchema {
+
+   private static final long serialVersionUID = -884738268437806062L;
+
+   /** Provider for schema coder. Used for initializing in each task. */
+   private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+   /** Coder used for reading schema from incoming stream. */
+   private transient SchemaCoder schemaCoder;
+
+   /**
+* Creates Avro deserialization schema that reads schema from input 
stream using provided {@link SchemaCoder}.
+*
+* @param recordClazz class to which deserialize. Should be 
either
+*{@link SpecificRecord} or {@link 
GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided 
if recordClazz is
+*{@link GenericRecord}
+* @param schemaCoderProvider schema provider that allows instantiation 
of {@link SchemaCoder} that will be used for
+*schema reading
+*/
+   protected RegistryAvroDeserializationSchema(Class recordClazz, 
@Nullable Schema reader,
+   SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+   super(recordClazz, reader);
+   this.schemaCoderProvider = schemaCoderProvider;
+   this.schemaCoder = schemaCoderProvider.get();
+   }
+
+   @Override
+   public T deserialize(byte[] message) {
+   // read record
+   try {
+   checkAvroInitialized();
+   getInputStream().setBuffer(message);
+   Schema writerSchema = 
schemaCoder.readSchema(getInputStream());
+   Schema readerSchema = getReaderSchema();
+
+   GenericDatumReader datumReader = getDatumReader();
+
+   datumReader.setSchema(writerSchema);
+   datumReader.setExpected(readerSchema);
+
+   return datumReader.read(null, getDecoder());
+   } catch (Exception e) {
+   throw new RuntimeException("Failed to deserialize 
Row.", e);
--- End diff --

The method `deserialize()` can throw an IOException. That got dropped from 
the signature, and exceptions are not wrapped into a RuntimeException. That 
makes exception stack traces more complicated, and hides the fact that "there 
is a possible exceptional case to handle" from the consumers of that code.

I think that this makes a general rule: Whenever using `RutimeException`, 
take a step back and look at the exception structure and signatures, and see if 
something is not declared well.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185420
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = -6766681879020862312L;
+
+   /** Class to deserialize to. */
+   private final Class recordClazz;
+
+   /** Schema in case of GenericRecord for serialization purpose. */
+   private final String schemaString;
+
+   /** Reader that deserializes byte array into a record. */
+   private transient GenericDatumReader datumReader;
+
+   /** Input stream to read message from. */
+   private transient MutableByteArrayInputStream inputStream;
+
+   /** Avro decoder that decodes binary data. */
+   private transient Decoder decoder;
+
+   /** Avro schema for the reader. */
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
--- End diff --

I would skip the initialization in the constructor, if you have he 
initialization in `checkAvroInitialized()`. Simpler, and avoids having two 
places that to the initialization which have to be kept in sync.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-18 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r189185186
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.r

[GitHub] flink pull request #:

2018-05-17 Thread StephanEwen
Github user StephanEwen commented on the pull request:


https://github.com/apache/flink/commit/c8fa8d025684c2225824c54a7285bbfdec7cfddc#commitcomment-29021995
  
In 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java:
In 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 on line 25:
true, did not see that, and checkstyle is off for that class.
The class is removed now in a follow-up commit anyways, so the problem is 
fixed now.


---


[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.

2018-05-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6039
  
Very nice, very helpful for users.

+1 to merge this


---


[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

2018-05-17 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5979
  
Okay, looks really good from my side.

Would be good if @StefanRRichter or @azagrebin to double check the change, 
otherwise good to go.


---


[GitHub] flink issue #5966: [FLINK-9312] [security] Add mutual authentication for RPC...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5966
  
I would move ahead with this PR as follows:

  - Separate internal and external SSL config
  - Activate SSL client auth for akka, netty, and blob server (pure 
internal communication)

Let's discuss external connectivity on FLIP-26


---


[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/6001
  
Looks good, thanks, merging this...


---


[GitHub] flink issue #5979: [FLINK-9070][state]improve the performance of RocksDBMapS...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5979
  
Could you share some micro-benchmark numbers?
When we change something that we know works well to something new, would be 
good to understand what benefits we are talking about.


---


[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5970
  
Merging this...


---


[GitHub] flink pull request #5977: [FLINK-9295][kafka] Fix transactional.id collision...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5977#discussion_r188725700
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
 ---
@@ -71,6 +71,17 @@
@PublicEvolving
MetricGroup getMetricGroup();
 
+   /**
+* Returned value is guaranteed to be unique between operators within 
the same job and to be
+* stable and the same across job submissions.
+*
+* This operation is currently only supported in Streaming 
(DataStream) contexts.
+*
+* @return String representation of the operator's unique id.
+*/
+   @PublicEvolving
+   String getOperatorUniqueID();
--- End diff --

I am still much in favor of not exposing this in the RuntimeContext:

  - Having the state accesses in the RuntimeContext was a necessity of that 
moment, because there was no `initializeState()` and it is crucial to be 
exposed to users.
  - This operatorID is not crucial to be exposed to users, hence a very 
different case to me.

  - It is super easy to expose it later, it is much harder (even if marked 
as PublicEvolving) to hide it later. For a quick move, not exposing an addition 
publicly should always be the default choice, also beyond this specific case 
here.



---


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
I would actually keep the package name for now. It makes sense, because the 
connection to the registry is avro-specific at the moment...


---


[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6001#discussion_r188529928
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -797,7 +797,7 @@ DataStream input = ...;
 
 input
   .keyBy()
-  .timeWindow()
+  .timeWindow()
--- End diff --

How about using "duration" instead of "time size". I think "time size" is 
not a commonly used term...


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188340240
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param  type of record it produces
+ */
+public class AvroDeserializationSchema implements 
DeserializationSchema {
+
+   /**
+* Class to deserialize to.
+*/
+   private Class recordClazz;
+
+   private String schemaString = null;
+
+   /**
+* Reader that deserializes byte array into a record.
+*/
+   private transient GenericDatumReader datumReader;
+
+   /**
+* Input stream to read message from.
+*/
+   private transient MutableByteArrayInputStream inputStream;
+
+   /**
+* Avro decoder that decodes binary data.
+*/
+   private transient Decoder decoder;
+
+   /**
+* Avro schema for the reader.
+*/
+   private transient Schema reader;
+
+   /**
+* Creates a Avro deserialization schema.
+*
+* @param recordClazz class to which deserialize. Should be one of:
+*{@link org.apache.avro.specific.SpecificRecord},
+*{@link org.apache.avro.generic.GenericRecord}.
+* @param reader  reader's Avro schema. Should be provided if 
recordClazz is
+*{@link GenericRecord}
+*/
+   AvroDeserializationSchema(Class recordClazz, @Nullable Schema 
reader) {
+   Preconditions.checkNotNull(recordClazz, "Avro record class must 
not be null.");
+   this.recordClazz = recordClazz;
+   this.inputStream = new MutableByteArrayInputStream();
+   this.decoder = DecoderFactory.get().binaryDecoder(inputStream, 
null);
+   this.r

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188316643
  
--- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
@@ -0,0 +1,94 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-formats
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-avro-confluent-registry
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   io.confluent
+   kafka-schema-registry-client
+   3.3.1
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   org.slf4j
+   slf4j-log4j12
+   
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   provided
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   
+   
+   shade-flink
+   package
+   
+   shade
+   
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
org.apache.flink.shaded.com.fasterxml.jackson.core
--- End diff --

We may need to qualify this further by this project, because we have that 
relocation pattern already in other places, for potentially different jackson 
versions.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188325819
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import org.apache.avro.Schema;
+
+import java.io.DataInputStream;
+import java.io.InputStream;
+
+/**
+ * Reads schema using Confluent Schema Registry protocol.
+ */
+public class ConfluentSchemaRegistryCoder implements SchemaCoder {
+
+   private final SchemaRegistryClient schemaRegistryClient;
+
+   /**
+* Creates {@link SchemaCoder} that uses provided {@link 
SchemaRegistryClient} to connect to
+* schema registry.
+*
+* @param schemaRegistryClient client to connect schema registry
+*/
+   public ConfluentSchemaRegistryCoder(SchemaRegistryClient 
schemaRegistryClient) {
+   this.schemaRegistryClient = schemaRegistryClient;
+   }
+
+   @Override
+   public Schema readSchema(InputStream in) throws Exception {
+   DataInputStream dataInputStream = new DataInputStream(in);
+
+   if (dataInputStream.readByte() != 0) {
+   throw new RuntimeException("Unknown data format. Magic 
number does not match");
--- End diff --

RuntimeExceptions (unchecked exceptions) are usually used to indicate 
programming errors, or (as a workaround) if the scope does not allow throwing 
any exception.

This here is a case for a checked exception, in my opinion, like an 
`IOException`, `FlinkException`, etc.


---


[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-15 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5995#discussion_r188328236
  
--- Diff: 
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ConfluentSchemaRegistryCoder}.
+ */
+public class ConfluentSchemaRegistryCoderTest {
--- End diff --

Do we want to test the magic byte verification?


---


  1   2   3   4   5   6   7   8   9   10   >