[GitHub] flink issue #4562: [FLINK-7402] Fix ineffective null check in NettyMessage#w...

2017-08-23 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@NicoK Than you for your review, I have reverted the imports changed by 
IDE, thanks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139508#comment-16139508
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4562
  
@NicoK Than you for your review, I have reverted the imports changed by 
IDE, thanks


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-23 Thread Dian Fu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139494#comment-16139494
 ] 

Dian Fu commented on FLINK-7129:


I think we can firstly create a keyed stream of events {{ds1}} and a non-keyed 
stream of patterns {{ds2}} and then connect them with {{ds1.connect(ds2)}} to 
collocate them. Regarding to {{TwoInputStreamOperator}}, I didn't find any 
limitations to support inputs with one keyed and one non-keyed.



> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-4500) Cassandra sink can lose messages

2017-08-23 Thread Michael Fong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139448#comment-16139448
 ] 

Michael Fong edited comment on FLINK-4500 at 8/24/17 1:36 AM:
--

I plan to design the first part of the requirement on existing implementation by
1. implements CheckedpointedFunction
2. leverage AtomicInteger updatesPending to make sure all in-flight records 
will be flushed to sink when checkpoint performs. 
3. As other data connectors, there will be a flag to turn on or off to the 
honor the promise during checkpointing.  

For the reference, the second part was addressed by FLINK-5101 in the earlier 
comments. 


was (Author: mcfongtw):
I plan to design the first part of the requirement on existing implementation by
1. implements CheckedpointedFunction
2. leverage AtomicInteger updatesPending to make sure all in-flight records 
will be flushed to sink when checkpoint performs. 
3. As other data connectors, there will be a flag to turn on or off to the 
honor the promise during checkpointing.  

For the reference, the second part was addressed by FLINK-5101 already. 

> Cassandra sink can lose messages
> 
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4500) Cassandra sink can lose messages

2017-08-23 Thread Michael Fong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139448#comment-16139448
 ] 

Michael Fong commented on FLINK-4500:
-

I plan to design the first part of the requirement on existing implementation by
1. implements CheckedpointedFunction
2. leverage AtomicInteger updatesPending to make sure all in-flight records 
will be flushed to sink when checkpoint performs. 
3. As other data connectors, there will be a flag to turn on or off to the 
honor the promise during checkpointing.  

For the reference, the second part was addressed by FLINK-5101 already. 

> Cassandra sink can lose messages
> 
>
> Key: FLINK-4500
> URL: https://issues.apache.org/jira/browse/FLINK-4500
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector
>Affects Versions: 1.1.0
>Reporter: Elias Levy
>Assignee: Michael Fong
>
> The problem is the same as I pointed out with the Kafka producer sink 
> (FLINK-4027).  The CassandraTupleSink's send() and CassandraPojoSink's send() 
> both send data asynchronously to Cassandra and record whether an error occurs 
> via a future callback.  But CassandraSinkBase does not implement 
> Checkpointed, so it can't stop checkpoint from happening even though the are 
> Cassandra queries in flight from the checkpoint that may fail.  If they do 
> fail, they would subsequently not be replayed when the job recovered, and 
> would thus be lost.
> In addition, 
> CassandraSinkBase's close should check whether there is a pending exception 
> and throw it, rather than silently close.  It should also wait for any 
> pending async queries to complete and check their status before closing.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4576: [FLINK-7398] Table API operators/UDFs must not sto...

2017-08-23 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/4576

[FLINK-7398] Table API operators/UDFs must not store Logger.

This PR moves all the usages of {{slf4j.Logger}} to a common trait so that 
we can ensure the Logger object is not serialized over the wire.

It turns out that adding a rule in checkstyles is more involved and it is 
deferred to another PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-7398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4576.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4576


commit 4cd80bb2ff167045f150adccc5bf4a04f70be5e9
Author: Haohui Mai 
Date:   2017-08-23T21:13:07Z

[FLINK-7398] Table API operators/UDFs must not store Logger.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7398) Table API operators/UDFs must not store Logger

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139145#comment-16139145
 ] 

ASF GitHub Bot commented on FLINK-7398:
---

GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/4576

[FLINK-7398] Table API operators/UDFs must not store Logger.

This PR moves all the usages of {{slf4j.Logger}} to a common trait so that 
we can ensure the Logger object is not serialized over the wire.

It turns out that adding a rule in checkstyles is more involved and it is 
deferred to another PR.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-7398

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4576.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4576


commit 4cd80bb2ff167045f150adccc5bf4a04f70be5e9
Author: Haohui Mai 
Date:   2017-08-23T21:13:07Z

[FLINK-7398] Table API operators/UDFs must not store Logger.




> Table API operators/UDFs must not store Logger
> --
>
> Key: FLINK-7398
> URL: https://issues.apache.org/jira/browse/FLINK-7398
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Aljoscha Krettek
>Assignee: Haohui Mai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
> Attachments: Example.png
>
>
> Table API operators and UDFs store a reference to the (slf4j) {{Logger}} in 
> an instance field (c.f. 
> https://github.com/apache/flink/blob/f37988c19adc30d324cde83c54f2fa5d36efb9e7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala#L39).
>  This means that the {{Logger}} will be serialised with the UDF and sent to 
> the cluster. This in itself does not sound right and leads to problems when 
> the slf4j configuration on the Client is different from the cluster 
> environment.
> This is an example of a user running into that problem: 
> https://lists.apache.org/thread.html/01dd44007c0122d60c3fd2b2bb04fd2d6d2114bcff1e34d1d2079522@%3Cuser.flink.apache.org%3E.
>  Here, they have Logback on the client but the Logback classes are not 
> available on the cluster and so deserialisation of the UDFs fails with a 
> {{ClassNotFoundException}}.
> This is a rough list of the involved classes:
> {code}
> src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala:43: 
>  private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/catalog/ExternalTableSourceUtil.scala:45:
>   private val LOG: Logger = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/codegen/calls/BuiltInMethods.scala:28:  
> val LOG10 = Types.lookupMethod(classOf[Math], "log10", classOf[Double])
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala:62:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala:59:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala:51:
>   private val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/plan/nodes/FlinkConventions.scala:28:  
> val LOGICAL: Convention = new Convention.Impl("LOGICAL", 
> classOf[FlinkLogicalRel])
> src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala:38:  val 
> LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
> src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala:36:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala:43:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetFinalAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala:44:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggregatePreProcessor.scala:55:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala:66:
>   val LOG = LoggerFactory.getLogger(this.getClass)
> src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala:56:
>   val LOG = 

[jira] [Commented] (FLINK-3924) Remove protobuf shading from Kinesis connector

2017-08-23 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16139012#comment-16139012
 ] 

Aljoscha Krettek commented on FLINK-3924:
-

[~rmetzger] Is this still an issue? 

> Remove protobuf shading from Kinesis connector
> --
>
> Key: FLINK-3924
> URL: https://issues.apache.org/jira/browse/FLINK-3924
> Project: Flink
>  Issue Type: Task
>  Components: Kinesis Connector, Streaming Connectors
>Reporter: Robert Metzger
>
> The Kinesis connector is currently creating a fat jar with a custom protobuf 
> version (2.6.1), relocated into a different package.
> We need to build the fat jar to change the protobuf calls from the original 
> protobuf to the relocated one.
> Because Kinesis is licensed under the Amazon Software License (which is not 
> entirely to the ASL2.0), I don't want to deploy kinesis connector binaries to 
> maven central with the releases. These binaries would contain code from 
> Amazon. It would be more than just linking to an (optional) dependencies.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time

2017-08-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-5883:

Fix Version/s: 1.3.0

> Re-adding the Exception-thrown code for ListKeyGroupedIterator when the 
> iterator is requested the second time
> -
>
> Key: FLINK-5883
> URL: https://issues.apache.org/jira/browse/FLINK-5883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: lincoln.lee
>Assignee: lincoln.lee
> Fix For: 1.3.0
>
>
> Originally, ListKeyGroupedIterator ensured that a TraversableOnceException 
> was thrown when the iterator is requested the second time within FLINK-1023, 
> it was lost from FLINK-1110 unexpectedly, so add it back. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5883) Re-adding the Exception-thrown code for ListKeyGroupedIterator when the iterator is requested the second time

2017-08-23 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-5883.
---
Resolution: Fixed

This was actually fixed a while back in d5f2647

> Re-adding the Exception-thrown code for ListKeyGroupedIterator when the 
> iterator is requested the second time
> -
>
> Key: FLINK-5883
> URL: https://issues.apache.org/jira/browse/FLINK-5883
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API
>Reporter: lincoln.lee
>Assignee: lincoln.lee
> Fix For: 1.3.0
>
>
> Originally, ListKeyGroupedIterator ensured that a TraversableOnceException 
> was thrown when the iterator is requested the second time within FLINK-1023, 
> it was lost from FLINK-1110 unexpectedly, so add it back. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7432) Unclosed HighAvailabilityServices instance in QueryableStateClient

2017-08-23 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7432:
--
Description: 
{code}
  public QueryableStateClient(Configuration config) throws Exception {
this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
{code}

The HighAvailabilityServices instance is only used for calling 
getJobManagerLeaderRetriever().

The instance should be closed upon leaving QueryableStateClient ctor.

  was:
{code}
  public QueryableStateClient(Configuration config) throws Exception {
this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
config, Executors.directExecutor(), 
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
{code}
The HighAvailabilityServices instance is only used for calling 
getJobManagerLeaderRetriever().

The instance should be closed upon leaving QueryableStateClient ctor.


> Unclosed HighAvailabilityServices instance in QueryableStateClient
> --
>
> Key: FLINK-7432
> URL: https://issues.apache.org/jira/browse/FLINK-7432
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   public QueryableStateClient(Configuration config) throws Exception {
> this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config, Executors.directExecutor(), 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
> {code}
> The HighAvailabilityServices instance is only used for calling 
> getJobManagerLeaderRetriever().
> The instance should be closed upon leaving QueryableStateClient ctor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7049) TestingApplicationMaster keeps running after integration tests finish

2017-08-23 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069086#comment-16069086
 ] 

Ted Yu edited comment on FLINK-7049 at 8/23/17 8:02 PM:


Stack trace for TestingApplicationMaster.


was (Author: yuzhih...@gmail.com):
Stack trace for TestingApplicationMaster .

> TestingApplicationMaster keeps running after integration tests finish
> -
>
> Key: FLINK-7049
> URL: https://issues.apache.org/jira/browse/FLINK-7049
> Project: Flink
>  Issue Type: Test
>  Components: Tests, YARN
>Reporter: Ted Yu
>Priority: Minor
> Attachments: testingApplicationMaster.stack
>
>
> After integration tests finish, TestingApplicationMaster is still running.
> Toward the end of 
> flink-yarn-tests/target/flink-yarn-tests-ha/flink-yarn-tests-ha-logDir-nm-1_0/application_1498768839874_0001/container_1498768839874_0001_03_01/jobmanager.log
>  :
> {code}
> 2017-06-29 22:09:49,681 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server 127.0.0.1/127.0.0.1:46165
> 2017-06-29 22:09:49,681 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed
> 2017-06-29 22:09:49,682 WARN  org.apache.zookeeper.ClientCnxn 
>   - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>   at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> 2017-06-29 22:09:50,782 WARN  org.apache.zookeeper.ClientCnxn 
>   - SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/tmp/jaas-3597644653611245612.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it.
> 2017-06-29 22:09:50,782 INFO  org.apache.zookeeper.ClientCnxn 
>   - Opening socket connection to server 127.0.0.1/127.0.0.1:46165
> 2017-06-29 22:09:50,782 ERROR 
> org.apache.flink.shaded.org.apache.curator.ConnectionState- 
> Authentication failed
> 2017-06-29 22:09:50,783 WARN  org.apache.zookeeper.ClientCnxn 
>   - Session 0x0 for server null, unexpected error, closing socket 
> connection and attempting reconnect
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>   at 
> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>   at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138874#comment-16138874
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134838729
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread rangadi
Github user rangadi commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134838729
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[jira] [Commented] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()

2017-08-23 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138719#comment-16138719
 ] 

Ted Yu commented on FLINK-7495:
---

AbstractKeyedCEPPatternOperator#restoreState should call 
AbstractUdfStreamOperator#restoreState()

> AbstractUdfStreamOperator#initializeState() should be called in 
> AsyncWaitOperator#initializeState()
> ---
>
> Key: FLINK-7495
> URL: https://issues.apache.org/jira/browse/FLINK-7495
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> recoveredStreamElements = context
>   .getOperatorStateStore()
>   .getListState(new ListStateDescriptor<>(STATE_NAME, 
> inStreamElementSerializer));
> {code}
> Call to AbstractUdfStreamOperator#initializeState() should be added in the 
> beginning



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7495) AbstractUdfStreamOperator#initializeState() should be called in AsyncWaitOperator#initializeState()

2017-08-23 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7495:
-

 Summary: AbstractUdfStreamOperator#initializeState() should be 
called in AsyncWaitOperator#initializeState()
 Key: FLINK-7495
 URL: https://issues.apache.org/jira/browse/FLINK-7495
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


{code}
recoveredStreamElements = context
  .getOperatorStateStore()
  .getListState(new ListStateDescriptor<>(STATE_NAME, 
inStreamElementSerializer));
{code}
Call to AbstractUdfStreamOperator#initializeState() should be added in the 
beginning



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7494) No license headers in ".travis.yml" file

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138553#comment-16138553
 ] 

ASF GitHub Bot commented on FLINK-7494:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4575

[FLINK-7494][travis] Add license headers to '.travis.yml' file

Just add license headers to '.travis.yml' file.
:smile:

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-7494

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4575


commit d4f28f1f593dbf795cc7e14474d5f243251d7c0f
Author: yew1eb 
Date:   2017-08-23T16:09:37Z

Add license headers to '.travis.yml' file




> No license headers in ".travis.yml" file
> 
>
> Key: FLINK-7494
> URL: https://issues.apache.org/jira/browse/FLINK-7494
> Project: Flink
>  Issue Type: Wish
>  Components: Travis
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>
> I will fix the ".travis.yml" file.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4575: [FLINK-7494][travis] Add license headers to '.trav...

2017-08-23 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/4575

[FLINK-7494][travis] Add license headers to '.travis.yml' file

Just add license headers to '.travis.yml' file.
:smile:

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-7494

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4575.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4575


commit d4f28f1f593dbf795cc7e14474d5f243251d7c0f
Author: yew1eb 
Date:   2017-08-23T16:09:37Z

Add license headers to '.travis.yml' file




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138549#comment-16138549
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4564
  
@StephanEwen I updated the PR. Turns out you also have to override the 
resource-related methods to change the resolution order. I added checks for 
that to the end-to-end test.


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4564: [FLINK-7442] Add option for using child-first classloader...

2017-08-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4564
  
@StephanEwen I updated the PR. Turns out you also have to override the 
resource-related methods to change the resolution order. I added checks for 
that to the end-to-end test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-7494) No license headers in ".travis.yml" file

2017-08-23 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-7494:
---

 Summary: No license headers in ".travis.yml" file
 Key: FLINK-7494
 URL: https://issues.apache.org/jira/browse/FLINK-7494
 Project: Flink
  Issue Type: Wish
  Components: Travis
Reporter: Hai Zhou
Assignee: Hai Zhou


I will fix the ".travis.yml" file.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7442) Add option for using a child-first classloader for loading user code

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138540#comment-16138540
 ] 

ASF GitHub Bot commented on FLINK-7442:
---

Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4554


> Add option for using a child-first classloader for loading user code
> 
>
> Key: FLINK-7442
> URL: https://issues.apache.org/jira/browse/FLINK-7442
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4554: [FLINK-7442] Add option for using child-first clas...

2017-08-23 Thread aljoscha
Github user aljoscha closed the pull request at:

https://github.com/apache/flink/pull/4554


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7449) Improve and enhance documentation for incremental checkpoints

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138522#comment-16138522
 ] 

ASF GitHub Bot commented on FLINK-7449:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4543
  
CC @alpinegizmo 


> Improve and enhance documentation for incremental checkpoints
> -
>
> Key: FLINK-7449
> URL: https://issues.apache.org/jira/browse/FLINK-7449
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Minor
>
> We should provide more details about incremental checkpoints in the 
> documentation.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4543: [FLINK-7449] [docs] Additional documentation for incremen...

2017-08-23 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4543
  
CC @alpinegizmo 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-7078) Support fencing tokens to filter out outdated messages

2017-08-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7078:


Assignee: Till Rohrmann

> Support fencing tokens to filter out outdated messages
> --
>
> Key: FLINK-7078
> URL: https://issues.apache.org/jira/browse/FLINK-7078
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to guard against split brain situations, it is important that RPC 
> calls are guarded with a fencing token. The sender attaches his fencing token 
> to a RPC message which is then used on the receiver side to compare against 
> the expected fencing token. An example is the leader session ID which we 
> attach to all critical RPC messages.
> So far, in the Flip-6 code base we send fencing tokens explicitly. This is 
> not only cumbersome but also error-prone because you have to do it for all 
> RPCs. Therefore, it would be better if we could automatically compare fencing 
> tokens for a given RPC from a given source. This should ideally happen on the 
> level of the RPC server.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138435#comment-16138435
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134771937
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

Yes, you couldn't use it with that (without handing in the rule when 
initialising the context). I'm just pointing it out but we'll leave it as is.  


> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

2017-08-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134771937
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

Yes, you couldn't use it with that (without handing in the rule when 
initialising the context). I'm just pointing it out but we'll leave it as is. 
😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2017-08-23 Thread Jamie Grier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138418#comment-16138418
 ] 

Jamie Grier commented on FLINK-4947:


Sounds good to me :)

> Make all configuration possible via flink-conf.yaml and CLI.
> 
>
> Key: FLINK-4947
> URL: https://issues.apache.org/jira/browse/FLINK-4947
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
>
> I think it's important to make all configuration possible via the 
> flink-conf.yaml and the command line.
> As an example:  To enable "externalizedCheckpoints" you must actually call 
> the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from 
> your Flink program.
> Another example of this would be configuring the RocksDB state backend.
> I think it important to make deployment flexible and easy to build tools 
> around.  For example, the infrastructure teams that make these configuration 
> decisions and provide tools for deploying Flink apps, will be different from 
> the teams deploying apps.  The team writing apps should not have to set all 
> of this lower level configuration up in their programs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138390#comment-16138390
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134759790
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

I think I wouldn't be able to use the `@Rule` here for those directories. I 
would have to write my own rule, that would wrap whole `TestContext`, right? If 
so, can we leave as it is for know, because currently I'm getting stack  
overflow exceptions with number of opened issues that I'm currently working on 
;)


> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4561: [FLINK-7476][streaming] Continue using previous transacti...

2017-08-23 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4561
  
I have abandoned last commit. This PR is now pure refactor/hotfix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138393#comment-16138393
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4561
  
I have abandoned last commit. This PR is now pure refactor/hotfix.


> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134759790
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

I think I wouldn't be able to use the `@Rule` here for those directories. I 
would have to write my own rule, that would wrap whole `TestContext`, right? If 
so, can we leave as it is for know, because currently I'm getting stack  
overflow exceptions with number of opened issues that I'm currently working on 
;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7429) Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138304#comment-16138304
 ] 

ASF GitHub Bot commented on FLINK-7429:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4565
  
Checkstyle is complaining about `FlinkKinesisConsumerMigrationTest`. I 
fixed and pushed on travis. 

The changes look very good.  I'll merge when Travis is green.


> Add restore from 1.2 / 1.3 migration tests for FlinkKinesisConsumer
> ---
>
> Key: FLINK-7429
> URL: https://issues.apache.org/jira/browse/FLINK-7429
> Project: Flink
>  Issue Type: Test
>  Components: Kinesis Connector, Tests
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, the `FlinkKinesisConsumerMigrationTest` only tests restore from 
> Flink 1.1.
> We should extend that to also verify restoring from 1.2 and 1.3.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4565: [FLINK-7429] [kinesis] Add migration test coverage for Fl...

2017-08-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4565
  
Checkstyle is complaining about `FlinkKinesisConsumerMigrationTest`. I 
fixed and pushed on travis. 

The changes look very good. 👍 I'll merge when Travis is green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7476) Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138289#comment-16138289
 ] 

ASF GitHub Bot commented on FLINK-7476:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134737971
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

I think you could also use the `TemporaryFolder` rule for this:
```
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
```

but this also seems fine.



> Try to recover from failure in TwoPhaseCommitSinkFunction.beginTransaction
> --
>
> Key: FLINK-7476
> URL: https://issues.apache.org/jira/browse/FLINK-7476
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.4.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
> Fix For: 1.4.0
>
>
> Currently when using TwoPhaseCommitSinkFunction, if there is some 
> intermittent failure in "beginTransaction", not only the snapshot that 
> triggered this call fail, but also any subsequent write requests will fail 
> also, rendering such sink unusable until application restart.
> This issue is in code that hasn't been released yet.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4561: [FLINK-7476][streaming] Continue using previous tr...

2017-08-23 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4561#discussion_r134737971
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunctionTest.java
 ---
@@ -185,11 +253,44 @@ protected void recoverAndAbort(FileTransaction 
transaction) {
 
private static class FileTransaction {
private final File tmpFile;
-   private final transient Writer writer;
+   private final transient BufferedWriter writer;
 
public FileTransaction(File tmpFile) throws IOException {
this.tmpFile = tmpFile;
this.writer = new BufferedWriter(new 
FileWriter(tmpFile));
}
+
+   @Override
+   public String toString() {
+   return String.format("FileTransaction[%s]", 
tmpFile.getName());
+   }
+   }
+
+   private static class TestContext implements AutoCloseable {
+   public final File tmpDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_tmp").toFile();
+   public final File targetDirectory = 
Files.createTempDirectory(TwoPhaseCommitSinkFunctionTest.class.getSimpleName() 
+ "_target").toFile();
+
+   public FileBasedSinkFunction sinkFunction;
+   public OneInputStreamOperatorTestHarness 
harness;
+
+   private TestContext() throws Exception {
+   tmpDirectory.deleteOnExit();
--- End diff --

I think you could also use the `TemporaryFolder` rule for this:
```
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
```

but this also seems fine.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4557: [hotifx][streaming] Simplify state of TwoPhaseCommitSinkF...

2017-08-23 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4557
  
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4557: [hotifx][streaming] Simplify state of TwoPhaseComm...

2017-08-23 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4557


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4557: [hotifx][streaming] Simplify state of TwoPhaseCommitSinkF...

2017-08-23 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4557
  
Merged 👌 Could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138281#comment-16138281
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've address the remaining comments.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4569: [FLINK-7040] [REST] Add basics for REST communication

2017-08-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've address the remaining comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-08-23 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138276#comment-16138276
 ] 

Dawid Wysakowicz commented on FLINK-7129:
-

Yes that would work for the state, but the problem is I could not find a way to 
"collocate" a non-keyed stream (stream of patterns) with keyed stream of 
events. I think `TwoInputStreamOperator` does not support inputs where one is 
keyed and the other is not.

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7129) Dynamically changing patterns

2017-08-23 Thread Dawid Wysakowicz (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138276#comment-16138276
 ] 

Dawid Wysakowicz edited comment on FLINK-7129 at 8/23/17 12:20 PM:
---

Yes that would work for the state, but the problem is I could not find a way to 
"collocate" a non-keyed stream (stream of patterns) with keyed stream of 
events. I think {{TwoInputStreamOperator}} does not support inputs where one is 
keyed and the other is not.


was (Author: dawidwys):
Yes that would work for the state, but the problem is I could not find a way to 
"collocate" a non-keyed stream (stream of patterns) with keyed stream of 
events. I think `TwoInputStreamOperator` does not support inputs where one is 
keyed and the other is not.

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138256#comment-16138256
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've reworked the client response parsing: the 
`ClientHandler` now only parses the response as JSON, whereas the conversion to 
the respective `ResponseBody` is handled in a future declared in 
`RestClientHandler#submitRequest()`. With this the `ClientHandler` is once 
again immutable and we don't have to do any casts.

I've also reworked how handlers communicate failures: The `HandlerResponse` 
classes have been removed, and handlers may now throw a (checked) 
`RestHandlerException` or complete the future with an exception (with special 
treating for RestHandlerExceptions). Unchecked or non-RestHandlerExceptions are 
still treated as implementation errors and will return a "500 Internal Server 
Error".


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4569: [FLINK-7040] [REST] Add basics for REST communication

2017-08-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've reworked the client response parsing: the 
`ClientHandler` now only parses the response as JSON, whereas the conversion to 
the respective `ResponseBody` is handled in a future declared in 
`RestClientHandler#submitRequest()`. With this the `ClientHandler` is once 
again immutable and we don't have to do any casts.

I've also reworked how handlers communicate failures: The `HandlerResponse` 
classes have been removed, and handlers may now throw a (checked) 
`RestHandlerException` or complete the future with an exception (with special 
treating for RestHandlerExceptions). Unchecked or non-RestHandlerExceptions are 
still treated as implementation errors and will return a "500 Internal Server 
Error".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138244#comment-16138244
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134726007
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134726007
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138243#comment-16138243
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134725699
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134725699
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[jira] [Commented] (FLINK-6751) Table API / SQL Docs: UDFs Page

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138206#comment-16138206
 ] 

ASF GitHub Bot commented on FLINK-6751:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4546
  
Please fill out the PR template, this will make the review process easier.


> Table API / SQL Docs: UDFs Page
> ---
>
> Key: FLINK-6751
> URL: https://issues.apache.org/jira/browse/FLINK-6751
> Project: Flink
>  Issue Type: Task
>  Components: Documentation, Table API & SQL
>Affects Versions: 1.3.0
>Reporter: Fabian Hueske
>Assignee: Shaoxuan Wang
>
> Update and extend the documentation of UDFs in the Table API / SQL: 
> {{./docs/dev/table/udfs.md}}
> Missing sections:
> - Registration of UDFs
> - UDAGGs



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4546: [FLINK-6751] [docs] Add missing documentation for User-De...

2017-08-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4546
  
Please fill out the PR template, this will make the review process easier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138194#comment-16138194
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134711336
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134711336
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138193#comment-16138193
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134711249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
--- End diff --

sorry for bothering you again, but please revert this change in the import 
order (it's also against the order defined by our checkstyle)


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134711249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -18,6 +18,12 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
--- End diff --

sorry for bothering you again, but please revert this change in the import 
order (it's also against the order defined by our checkstyle)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138179#comment-16138179
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've addressed most of your comments.

I have implemented the `HandlerResponse` as you suggested (casting option) 
for now, and include all query parameter as a list.

## Unaddressed comments:
* Cleaner shutdown of netty components in server
* Closing of netty connection (_somewhere_)
* RestEndpointITCase: client/server shutdown in finally block

##

## Parameter Rework
* Remove the `ParameterMapper` and related code
* `MessageHeaders` now has a generic `MessageParameters` argument
* `MessageParameters` is an abstract class that contains a collection of 
`MessageParameter`s that are suppported for the given request, and provides a 
utiltiy method for checking whether these parameters were resolved
* `MessageParameter` is also an abstract class, that defines the key, 
accepts a value (i.e. resolve the parameter), defines whether the parameter is 
mandatory/optional and whether it is a query/path parameter

### Usage

Let's say we want to have a request for getting the details of a job, with 
a path parameter for the job id :

We first define a path parameter for the job id (or reuse an existing one!):

```
public final class JobIDPathParameter extends MessagePathParameter {
public JobIDPathParameter () {
super("jobid", MessageParameterRequisiteness.MANDATORY);
}

public void resolve(JobID jobID) {
super.resolve(jobID.toString());
}
}
```

We then define the sum of all parameters for the request:

```
public final class JobDetailsParameters extends MessageParameters {
private final JobIDPathParameter jobID= new JobIDPathParameter ();

@Override
public Collection getParameters() {
return Collections.singleton(jobID);
}
}
```

And finally, we include it in the JobDetailsHeaders:
```
public final class JobDetailsHeaders extends 
MessageHeaders {
...
```

The usage for the client would then look like this:
```
JobID jobID = ...
JobDetailsParameters parameters = headers.getUnresolvedParameters();
JobDetailsParameters.jobID.resolve(jobID);
client.sendRequest(headers, parameters, request);
```



> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4569: [FLINK-7040] [REST] Add basics for REST communication

2017-08-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4569
  
@tillrohrmann I've addressed most of your comments.

I have implemented the `HandlerResponse` as you suggested (casting option) 
for now, and include all query parameter as a list.

## Unaddressed comments:
* Cleaner shutdown of netty components in server
* Closing of netty connection (_somewhere_)
* RestEndpointITCase: client/server shutdown in finally block

##

## Parameter Rework
* Remove the `ParameterMapper` and related code
* `MessageHeaders` now has a generic `MessageParameters` argument
* `MessageParameters` is an abstract class that contains a collection of 
`MessageParameter`s that are suppported for the given request, and provides a 
utiltiy method for checking whether these parameters were resolved
* `MessageParameter` is also an abstract class, that defines the key, 
accepts a value (i.e. resolve the parameter), defines whether the parameter is 
mandatory/optional and whether it is a query/path parameter

### Usage

Let's say we want to have a request for getting the details of a job, with 
a path parameter for the job id :

We first define a path parameter for the job id (or reuse an existing one!):

```
public final class JobIDPathParameter extends MessagePathParameter {
public JobIDPathParameter () {
super("jobid", MessageParameterRequisiteness.MANDATORY);
}

public void resolve(JobID jobID) {
super.resolve(jobID.toString());
}
}
```

We then define the sum of all parameters for the request:

```
public final class JobDetailsParameters extends MessageParameters {
private final JobIDPathParameter jobID= new JobIDPathParameter ();

@Override
public Collection getParameters() {
return Collections.singleton(jobID);
}
}
```

And finally, we include it in the JobDetailsHeaders:
```
public final class JobDetailsHeaders extends 
MessageHeaders {
...
```

The usage for the client would then look like this:
```
JobID jobID = ...
JobDetailsParameters parameters = headers.getUnresolvedParameters();
JobDetailsParameters.jobID.resolve(jobID);
client.sendRequest(headers, parameters, request);
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138157#comment-16138157
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134705260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link 
RequestBody} and path/query parameters.
+ */
+public class HandlerRequest {
+   private final R requestBody;
+   private final Map queryParameters;
+   private final Map pathParameters;
+
+   public HandlerRequest(R requestBody, Map 
pathParameters, Map queryParameters) {
+   this.requestBody = requestBody;
+   this.pathParameters = pathParameters;
+   if (!queryParameters.isEmpty()) {
+   this.queryParameters = new HashMap<>();
+   queryParameters.forEach((key, value) -> 
this.queryParameters.put(key, value.get(0)));
--- End diff --

Hmm, but shouldn't this be left to the RestHandler implementation to decide?


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134705260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link 
RequestBody} and path/query parameters.
+ */
+public class HandlerRequest {
+   private final R requestBody;
+   private final Map queryParameters;
+   private final Map pathParameters;
+
+   public HandlerRequest(R requestBody, Map 
pathParameters, Map queryParameters) {
+   this.requestBody = requestBody;
+   this.pathParameters = pathParameters;
+   if (!queryParameters.isEmpty()) {
+   this.queryParameters = new HashMap<>();
+   queryParameters.forEach((key, value) -> 
this.queryParameters.put(key, value.get(0)));
--- End diff --

Hmm, but shouldn't this be left to the RestHandler implementation to decide?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138156#comment-16138156
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134705172
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

This would also be a good solution imo.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134705172
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

This would also be a good solution imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138153#comment-16138153
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134704354
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,9 +220,7 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
-   if (null == buffer) {
-   throw new NullPointerException();
-   }
+   Preconditions.checkNotNull(buffer, "When deserializing 
the buffer should have to be requested first.");
--- End diff --

Great, thank you for your suggestion :)


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-23 Thread zjureel
Github user zjureel commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134704354
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,9 +220,7 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
-   if (null == buffer) {
-   throw new NullPointerException();
-   }
+   Preconditions.checkNotNull(buffer, "When deserializing 
the buffer should have to be requested first.");
--- End diff --

Great, thank you for your suggestion :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138150#comment-16138150
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134703249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

@tillrohrmann I'd rather revisit an earlier suggestion of yours where we 
use a special checked exception (that we either throw or fail he future with).


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134703249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

@tillrohrmann I'd rather revisit an earlier suggestion of yours where we 
use a special checked exception (that we either throw or fail he future with).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7402) Ineffective null check in NettyMessage#write()

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138128#comment-16138128
 ] 

ASF GitHub Bot commented on FLINK-7402:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134699645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,9 +220,7 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
-   if (null == buffer) {
-   throw new NullPointerException();
-   }
+   Preconditions.checkNotNull(buffer, "When deserializing 
the buffer should have to be requested first.");
--- End diff --

Actually, the message should rather be `No buffer instance to serialize.` 
since the constructor leading to this code path is only used for 
de-serialization.


> Ineffective null check in NettyMessage#write()
> --
>
> Key: FLINK-7402
> URL: https://issues.apache.org/jira/browse/FLINK-7402
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the null check in finally block:
> {code}
>   finally {
> if (buffer != null) {
>   buffer.recycle();
> }
> {code}
> But buffer has been dereferenced in the try block without guard.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4562: [FLINK-7402] Fix ineffective null check in NettyMe...

2017-08-23 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4562#discussion_r134699645
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -220,9 +220,7 @@ void releaseBuffer() {
 
@Override
ByteBuf write(ByteBufAllocator allocator) throws IOException {
-   if (null == buffer) {
-   throw new NullPointerException();
-   }
+   Preconditions.checkNotNull(buffer, "When deserializing 
the buffer should have to be requested first.");
--- End diff --

Actually, the message should rather be `No buffer instance to serialize.` 
since the constructor leading to this code path is only used for 
de-serialization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138126#comment-16138126
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134699407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link 
RequestBody} and path/query parameters.
+ */
+public class HandlerRequest {
+   private final R requestBody;
+   private final Map queryParameters;
+   private final Map pathParameters;
+
+   public HandlerRequest(R requestBody, Map 
pathParameters, Map queryParameters) {
+   this.requestBody = requestBody;
+   this.pathParameters = pathParameters;
+   if (!queryParameters.isEmpty()) {
+   this.queryParameters = new HashMap<>();
+   queryParameters.forEach((key, value) -> 
this.queryParameters.put(key, value.get(0)));
--- End diff --

While the HTTP specification doesn't forbid duplicate query parameters it 
also doesn't define how they should be handled. It is perfectly valid to handle 
the first/last parameter, or both either as a list or separately, the decision 
of which is entirely up to us.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4947) Make all configuration possible via flink-conf.yaml and CLI.

2017-08-23 Thread JIRA

[ 
https://issues.apache.org/jira/browse/FLINK-4947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138127#comment-16138127
 ] 

Gábor Hermann commented on FLINK-4947:
--

[~jgrier] Thanks for the clarification! I agree with all your points.

Could we split up this issue to
1. all config should be configurable via flink-conf.yaml
2. all flink-conf.yaml config should be available via command line

I would be happy to assess 2.

> Make all configuration possible via flink-conf.yaml and CLI.
> 
>
> Key: FLINK-4947
> URL: https://issues.apache.org/jira/browse/FLINK-4947
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Jamie Grier
>
> I think it's important to make all configuration possible via the 
> flink-conf.yaml and the command line.
> As an example:  To enable "externalizedCheckpoints" you must actually call 
> the StreamExecutionEnvironment#enableExternalizedCheckpoints() method from 
> your Flink program.
> Another example of this would be configuring the RocksDB state backend.
> I think it important to make deployment flexible and easy to build tools 
> around.  For example, the infrastructure teams that make these configuration 
> decisions and provide tools for deploying Flink apps, will be different from 
> the teams deploying apps.  The team writing apps should not have to set all 
> of this lower level configuration up in their programs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134699407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Simple container for the request to a handler, that contains the {@link 
RequestBody} and path/query parameters.
+ */
+public class HandlerRequest {
+   private final R requestBody;
+   private final Map queryParameters;
+   private final Map pathParameters;
+
+   public HandlerRequest(R requestBody, Map 
pathParameters, Map queryParameters) {
+   this.requestBody = requestBody;
+   this.pathParameters = pathParameters;
+   if (!queryParameters.isEmpty()) {
+   this.queryParameters = new HashMap<>();
+   queryParameters.forEach((key, value) -> 
this.queryParameters.put(key, value.get(0)));
--- End diff --

While the HTTP specification doesn't forbid duplicate query parameters it 
also doesn't define how they should be handled. It is perfectly valid to handle 
the first/last parameter, or both either as a list or separately, the decision 
of which is entirely up to us.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6864) Remove confusing "invalid POJO type" messages from TypeExtractor

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6864?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138029#comment-16138029
 ] 

ASF GitHub Bot commented on FLINK-6864:
---

Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/4574
  
I agree with @zjureel . Even though I know the POJO rules well, it still 
happened to me more than once that I accidentally had a non-POJO, and I only 
noticed this because of these log messages, so I think it would be problematic 
to completely remove them.


> Remove confusing "invalid POJO type" messages from TypeExtractor
> 
>
> Key: FLINK-6864
> URL: https://issues.apache.org/jira/browse/FLINK-6864
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Fang Yong
>
> When a user's type cannot be treated as a POJO, the {{TypeExtractor}} will 
> log warnings such as ".. must have a default constructor to be used as a 
> POJO.", "  ... is not a valid POJO type because not all fields are valid POJO 
> fields." in the {{analyzePojo}} method.
> These messages are often conceived as misleading for the user to think that 
> the job should have failed, whereas in fact in these cases Flink just 
> fallsback to Kryo and treat then as generic types. We should remove these 
> messages, and at the same time improve the type serialization docs at [1] to 
> explicitly inform what it means when Flink does / does not recognizes a user 
> type as a POJO.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138090#comment-16138090
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134692032
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException 
{
 * @param event The current event to be processed
 * @param timestamp The timestamp of the event
 */
-   protected abstract void processEvent(NFA nfa, IN event, long 
timestamp);
+   private void processEvent(NFA nfa, IN event, long timestamp)  {
+   Tuple2>, 
Collection, Long>>> patterns =
+   nfa.process(event, timestamp);
+
+   try {
+   processMatchedSequences(patterns.f0, timestamp);
+   processTimeoutedSequence(patterns.f1, timestamp);
--- End diff --

processTimeoutedSequence -> processTimedoutSequence


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138092#comment-16138092
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134692117
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException 
{
 * @param event The current event to be processed
 * @param timestamp The timestamp of the event
 */
-   protected abstract void processEvent(NFA nfa, IN event, long 
timestamp);
+   private void processEvent(NFA nfa, IN event, long timestamp)  {
+   Tuple2>, 
Collection, Long>>> patterns =
+   nfa.process(event, timestamp);
+
+   try {
+   processMatchedSequences(patterns.f0, timestamp);
--- End diff --

processMatchedSequences -> processMatchingSequences


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-08-23 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134692032
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException 
{
 * @param event The current event to be processed
 * @param timestamp The timestamp of the event
 */
-   protected abstract void processEvent(NFA nfa, IN event, long 
timestamp);
+   private void processEvent(NFA nfa, IN event, long timestamp)  {
+   Tuple2>, 
Collection, Long>>> patterns =
+   nfa.process(event, timestamp);
+
+   try {
+   processMatchedSequences(patterns.f0, timestamp);
+   processTimeoutedSequence(patterns.f1, timestamp);
--- End diff --

processTimeoutedSequence -> processTimedoutSequence


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-08-23 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134692117
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 ---
@@ -348,7 +353,18 @@ private void updateNFA(NFA nfa) throws IOException 
{
 * @param event The current event to be processed
 * @param timestamp The timestamp of the event
 */
-   protected abstract void processEvent(NFA nfa, IN event, long 
timestamp);
+   private void processEvent(NFA nfa, IN event, long timestamp)  {
+   Tuple2>, 
Collection, Long>>> patterns =
+   nfa.process(event, timestamp);
+
+   try {
+   processMatchedSequences(patterns.f0, timestamp);
--- End diff --

processMatchedSequences -> processMatchingSequences


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138070#comment-16138070
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134689284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

@kl0u You can already do that by appending operations to the 
`CompletableFuture` that the handler returns, i.e. something like
```
resultFuture = ...
resultFuture.handle(userCallback)
return resultFuture
```


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138084#comment-16138084
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134690587
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

Technically it could still be a valid URL.

My conclusion is that this implementation for handling parameters is 
inherently flawed.
* The mapper gets a set of parameters in a map but can decide to completely 
ignore it, (which is how most implementation will most likely look like)
* There is no good means of verifying whether parameters were set or not
* It doesn't it handle optional parameters, which is common for query 
parameters
* it is limited to the client side, on the server side we're still dealing 
with String:String maps

I think I've come up with something that addresses the above short-comings, 
hang tight.


> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take 

[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-08-23 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134687041
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1279,63 +1279,75 @@ and `flatSelect` API calls allow a timeout handler 
to be specified. This timeout
 partial event sequence. The timeout handler receives all the events that 
have been matched so far by the pattern, and
 the timestamp when the timeout was detected.
 
+In order to treat partial patterns, the `select` and `flatSelect` API 
calls offer an overloaded version which takes as
+parameters
+
+ * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
+ * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the 
side output in which the timeouted matches will be returned
--- End diff --

timeouted -> timed out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134690587
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ParameterMapper.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used to map query/path {@link Parameter}s to their actual 
value.
+ */
+public abstract class ParameterMapper {
+
+   /**
+* Maps the given query {@link Parameter}s to their actual value.
+*
+* @param queryParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapQueryParameters(Set 
queryParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Maps the given path {@link Parameter}s to their actual value.
+*
+* @param pathParameters parameters to map
+* @return map containing the parameters and their associated value
+*/
+   public Map mapPathParameters(Set 
pathParameters) {
+   return Collections.emptyMap();
+   }
+
+   /**
+* Resolves the given URL (e.g "jobs/:jobid") using the given 
path/query parameters.
+*
+* @param genericUrl  URL to resolve
+* @param pathParameters  path parameters
+* @param queryParameters query parameters
+* @return resolved url, e.g "/jobs/1234?state=running"
+*/
+   public static String resolveUrl(String genericUrl, Map pathParameters, Map queryParameters) {
+   StringBuilder sb = new StringBuilder(genericUrl);
+
+   pathParameters.forEach((parameter, value) -> {
+   int start = sb.indexOf(":" + parameter.getKey());
+   sb.replace(start, start + parameter.getKey().length() + 
1, value);
+   });
--- End diff --

Technically it could still be a valid URL.

My conclusion is that this implementation for handling parameters is 
inherently flawed.
* The mapper gets a set of parameters in a map but can decide to completely 
ignore it, (which is how most implementation will most likely look like)
* There is no good means of verifying whether parameters were set or not
* It doesn't it handle optional parameters, which is common for query 
parameters
* it is limited to the client side, on the server side we're still dealing 
with String:String maps

I think I've come up with something that addresses the above short-comings, 
hang tight.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138083#comment-16138083
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134687041
  
--- Diff: docs/dev/libs/cep.md ---
@@ -1279,63 +1279,75 @@ and `flatSelect` API calls allow a timeout handler 
to be specified. This timeout
 partial event sequence. The timeout handler receives all the events that 
have been matched so far by the pattern, and
 the timestamp when the timeout was detected.
 
+In order to treat partial patterns, the `select` and `flatSelect` API 
calls offer an overloaded version which takes as
+parameters
+
+ * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
+ * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the 
side output in which the timeouted matches will be returned
--- End diff --

timeouted -> timed out


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138082#comment-16138082
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134689181
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -84,37 +79,54 @@ class PatternStream[T](jPatternStream: 
JPatternStream[T]) {
 *   pattern sequence.
 * @tparam L Type of the resulting timeout event
 * @tparam R Type of the resulting event
+* @deprecated Use the version that returns timeouted events as a 
side-output
--- End diff --

timeouted -> timed out


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4320: [FLINK-6244] Emit timeouted Patterns as Side Outpu...

2017-08-23 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4320#discussion_r134689181
  
--- Diff: 
flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 ---
@@ -84,37 +79,54 @@ class PatternStream[T](jPatternStream: 
JPatternStream[T]) {
 *   pattern sequence.
 * @tparam L Type of the resulting timeout event
 * @tparam R Type of the resulting event
+* @deprecated Use the version that returns timeouted events as a 
side-output
--- End diff --

timeouted -> timed out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134689284
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

@kl0u You can already do that by appending operations to the 
`CompletableFuture` that the handler returns, i.e. something like
```
resultFuture = ...
resultFuture.handle(userCallback)
return resultFuture
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7040) Flip-6 client-cluster communication

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138062#comment-16138062
 ] 

ASF GitHub Bot commented on FLINK-7040:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134686921
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

I think it would also be nice to allow the user to be able to specify a 
callback in the response which will be called when the response has been fully 
sent in the network. The reason for this is to be able to update metrics like 
"how long did the transfer take?" or "what is the progress so far (for big 
messages)". This would require adding the callback in the `HandlerResponse` 
which implements the `ChannelFutureListener` and in the 
`AbstractRestHandler.sendResponse()` register that listener with the 
`lastContentFuture`, if I understand correctly.



> Flip-6 client-cluster communication
> ---
>
> Key: FLINK-7040
> URL: https://issues.apache.org/jira/browse/FLINK-7040
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management, Mesos
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: flip-6
>
> With the new Flip-6 architecture, the client will communicate with the 
> cluster in a RESTful manner.
> The cluster shall support the following REST calls:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Lookup job leader (GET): Gets the JM leader for the given job
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Cancel job (PUT): Cancel the given job
> * Stop job (PUT): Stops the given job
> * Take savepoint (POST): Take savepoint for given job (How to return the 
> savepoint under which the savepoint was stored? Maybe always having to 
> specify a path)
> * Get KV state (GET): Gets the KV state for the given job and key (Queryable 
> state)
> * Poll/subscribe to notifications for job (GET, WebSocket): Polls new 
> notifications from the execution of the given job/Opens WebSocket to receive 
> notifications
> The first four REST calls will be served by the REST endpoint running in the 
> application master/cluster entrypoint. The other calls will be served by a 
> REST endpoint running along side to the JobManager.
> Detailed information about different implementations and their pros and cons 
> can be found in this document:
> https://docs.google.com/document/d/1eIX6FS9stwraRdSUgRSuLXC1sL7NAmxtuqIXe_jSi-k/edit?usp=sharing
> The implementation will most likely be Netty based.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4569: [FLINK-7040] [REST] Add basics for REST communicat...

2017-08-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/4569#discussion_r134686921
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerResponse.java
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+/**
+ * Simple container for the response of a handler, that contains either a 
response of type {@code P} if the incoming
+ * request was handled successfully, otherwise it contains an error 
message and an associated error code.
+ *
+ * @param  type of successful response
+ */
+public final class HandlerResponse {
--- End diff --

I think it would also be nice to allow the user to be able to specify a 
callback in the response which will be called when the response has been fully 
sent in the network. The reason for this is to be able to update metrics like 
"how long did the transfer take?" or "what is the progress so far (for big 
messages)". This would require adding the callback in the `HandlerResponse` 
which implements the `ChannelFutureListener` and in the 
`AbstractRestHandler.sendResponse()` register that listener with the 
`lastContentFuture`, if I understand correctly.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-23 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7337.
-
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed in 1.4: 93d0ae4a9f059da4bd2b720f7503da0f9c0a8c7c & 
47944b1bb23136ae498971b3765a0d3fe6bf2f18

> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.4.0
>
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138053#comment-16138053
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4532


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138052#comment-16138052
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4488


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4488: [FLINK-7337] [table] Refactor internal handling of...

2017-08-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4488


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4532: [FLINK-7337] [table] Refactor internal handling of...

2017-08-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4532


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-7337) Refactor handling of time indicator attributes

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138039#comment-16138039
 ] 

ASF GitHub Bot commented on FLINK-7337:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4532
  
Merging...


> Refactor handling of time indicator attributes
> --
>
> Key: FLINK-7337
> URL: https://issues.apache.org/jira/browse/FLINK-7337
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>
> After a [discussion on the dev mailing 
> list|https://lists.apache.org/thread.html/735d55f9022df8ff73566a9f1553e14be94f8443986ad46559b35869@%3Cdev.flink.apache.org%3E]
>  I propose the following changes to the current handling of time indicator 
> attributes:
> * Remove the separation of logical and physical row type.
> ** Hold the event-time timestamp as regular Long field in Row
> ** Represent the processing-time indicator type as a null-valued field in Row 
> (1 bit overhead)
> * Remove materialization of event-time timestamps because timestamp is 
> already accessible in Row.
> * Add {{ProcessFunction}} to set timestamp into the timestamp field of a 
> {{StreamRecord}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4532: [FLINK-7337] [table] Refactor internal handling of time i...

2017-08-23 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/4532
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #4574: [FLINK-6864] Fix confusing "invalid POJO type" messages f...

2017-08-23 Thread ggevay
Github user ggevay commented on the issue:

https://github.com/apache/flink/pull/4574
  
I agree with @zjureel . Even though I know the POJO rules well, it still 
happened to me more than once that I accidentally had a non-POJO, and I only 
noticed this because of these log messages, so I think it would be problematic 
to completely remove them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread Dawid Wysakowicz (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-6244.
-
Resolution: Fixed

> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16138014#comment-16138014
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134676224
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137962#comment-16137962
 ] 

ASF GitHub Bot commented on FLINK-6244:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/4320
  
I will merge it then.


> Emit timeouted Patterns as Side Output
> --
>
> Key: FLINK-6244
> URL: https://issues.apache.org/jira/browse/FLINK-6244
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Affects Versions: 1.3.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
> Fix For: 1.4.0
>
>
> Now that we have SideOuputs I think timeouted patterns should be emitted into 
> them rather than producing a stream of `Either`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134676224
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

[jira] [Commented] (FLINK-6988) Add Apache Kafka 0.11 connector

2017-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16137995#comment-16137995
 ] 

ASF GitHub Bot commented on FLINK-6988:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134673133
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 

[GitHub] flink pull request #4239: [FLINK-6988] flink-connector-kafka-0.11 with exact...

2017-08-23 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4239#discussion_r134673133
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ---
@@ -0,0 +1,1000 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.SerializableObject;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import 
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import 
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer;
+import 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMuttableWrapper;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is 
compatible with Kafka 0.11.x. By default producer
+ * will use {@link Semantic#EXACTLY_ONCE} semantic.
+ *
+ * Implementation note: This producer is a hybrid between a regular 
regular
+ * {@link 

  1   2   >