[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6083


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-25 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197994751
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
--- End diff --

We need this for `AvroSerializationConfluentSchema` to check Events with 
Schema before sending data to Kafka


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-25 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197994770
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,155 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   3.2.0
+   
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   3.0.0
+   
+   
+   package
+   
+   shade
+   
+   
+   
TestAvroConsumerConfluent
+   
+   
+   
com.google.code.findbugs:jsr305
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
org.apache.flink.schema.registry.test.TestAvroConsumerConfluent
+   
+   
+   
+   
+   
+   
+   
+   org.apache.avro
+   avro-maven-plugin
+   ${avro.version}
+   
+   
+   generate-sources
+   
+   schema
+   
+   
+   
${project.basedir}/src/main/avro/
+   
${project.basedir}/src/main/java/
+   

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-22 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197493383
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,155 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   3.2.0
+   
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   3.0.0
+   
+   
+   package
+   
+   shade
+   
+   
+   
TestAvroConsumerConfluent
+   
+   
+   
com.google.code.findbugs:jsr305
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
org.apache.flink.schema.registry.test.TestAvroConsumerConfluent
+   
+   
+   
+   
+   
+   
+   
+   org.apache.avro
+   avro-maven-plugin
+   ${avro.version}
+   
+   
+   generate-sources
+   
+   schema
+   
+   
+   
${project.basedir}/src/main/avro/
+   
${project.basedir}/src/main/java/
+   

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197490677
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+echo "Output from Flink program does not match expected output."
+echo -e "EXPECTED FOR KEY: --$expected--"
+echo -e "ACTUAL: --$2--"
+PASS=""
+exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+cd flink-end-to-end-tests/flink-confluent-schema-registry
+mvn clean package -Pbuild-jar -nsu
+
+start_kafka_cluster
+start_confluent_schema_registry
+sleep 5
+
+# modify configuration to use port 8082 for Flink
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/web.port: 8081/web.port: 8082/" 
$FLINK_DIR/conf/flink-conf.yaml
+
+TEST_PROGRAM_JAR=target/flink-confluent-schema-registry-1.6-SNAPSHOT.jar
+

+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'

+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'

+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'

+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  http://localhost:8081/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
--- End diff --

Alright. Then let's keep it like it is.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197490378
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,155 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   3.2.0
+   
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   3.0.0
+   
+   
+   package
+   
+   shade
+   
+   
+   
TestAvroConsumerConfluent
+   
+   
+   
com.google.code.findbugs:jsr305
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
org.apache.flink.schema.registry.test.TestAvroConsumerConfluent
+   
+   
+   
+   
+   
+   
+   
+   org.apache.avro
+   avro-maven-plugin
+   ${avro.version}
+   
+   
+   generate-sources
+   
+   schema
+   
+   
+   
${project.basedir}/src/main/avro/
+   
${project.basedir}/src/main/java/
+ 

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197490130
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
--- End diff --

For what do we need this dependency?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197490461
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,155 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   3.2.0
+   
+
+   
+   
+   confluent
+   http://packages.confluent.io/maven/
+   
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-java_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+
+   
+   
+   
+   org.apache.maven.plugins
+   maven-shade-plugin
+   3.0.0
+   
+   
+   package
+   
+   shade
+   
+   
+   
TestAvroConsumerConfluent
+   
+   
+   
com.google.code.findbugs:jsr305
+   
+   
+   
+   
+   
*:*
+   

+   
META-INF/*.SF
+   
META-INF/*.DSA
+   
META-INF/*.RSA
+   

+   
+   
+   
+   
+   
org.apache.flink.schema.registry.test.TestAvroConsumerConfluent
+   
+   
+   
+   
+   
+   
+   
+   org.apache.avro
+   avro-maven-plugin
+   ${avro.version}
+   
+   
+   generate-sources
+   
+   schema
+   
+   
+   
${project.basedir}/src/main/avro/
+   
${project.basedir}/src/main/java/
+ 

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-21 Thread medcv
Github user medcv commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r197128954
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+echo "Output from Flink program does not match expected output."
+echo -e "EXPECTED FOR KEY: --$expected--"
+echo -e "ACTUAL: --$2--"
+PASS=""
+exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+cd flink-end-to-end-tests/flink-confluent-schema-registry
+mvn clean package -Pbuild-jar -nsu
+
+start_kafka_cluster
+start_confluent_schema_registry
+sleep 5
+
+# modify configuration to use port 8082 for Flink
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/web.port: 8081/web.port: 8082/" 
$FLINK_DIR/conf/flink-conf.yaml
+
+TEST_PROGRAM_JAR=target/flink-confluent-schema-registry-1.6-SNAPSHOT.jar
+

+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'

+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'

+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'

+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  http://localhost:8081/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
--- End diff --

I tried to remove it but seems Schema Registry needs it and not working 
with fist record


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464034
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
--- End diff --

For what do we need this dependency?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464899
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

+   
*:*
+   

+  

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465238
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
--- End diff --

Not serializable. Please mark as `transient`.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473522
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
--- End diff --

Could be final, I guess.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195469740
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistyUrl) {
+   this(avroType, schemaRegistyUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
+   }
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity) {
+   this.avroType = avroType;
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   }
+
+   @Override
+   public T deserialize(byte[] message) throws IOException {
+   if (kafkaAvroDecoder == null) {
+   SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroDecoder = new 
KafkaAvroDecoder(schemaRegistryClient);
--- End diff --

Why do we use the `KafkaAvroDecoder` instead of the `KafkaAvroDeserializer`?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472317
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
--- End diff --

Remove unused code


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472489
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
+
+   if (GenericData.get().validate(new 
Schema.Parser().parse(schema), record)) {
+   serializedBytes = 
kafkaAvroSerializer.serialize(topicName, record);
+
+   } else {
+   System.out.println("Error :Invalid message : 
Doesn't follow the avro schema : Message not published to the topic, message = 
" + record.toString());
+
+   }
+
+   } catch (Exception ex) {
+   ex.printStackTrace();
--- End diff --

No proper exception handling.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473423
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
--- End diff --

This field is not used.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195476574
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/example/avro/User.java
 ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package example.avro;
+
+/**
+ * Autogenerated by Avro
+ *  DO NOT EDIT DIRECTLY 
+ */
+
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.message.BinaryMessageEncoder;
+import org.apache.avro.message.SchemaStore;
+import org.apache.avro.specific.SpecificData;
+/**
+**/
+
+@SuppressWarnings("all")
+@org.apache.avro.specific.AvroGenerated
+public class User extends org.apache.avro.specific.SpecificRecordBase 
implements org.apache.avro.specific.SpecificRecord {
--- End diff --

Can we automatically generate these classes by using Maven's 
`generate-sources` phase?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464456
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
--- End diff --

Why do we need this dependency?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465088
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

+   
*:*
+   

+  

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195447395
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/TestAvroConsumerConfluent.java
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
+
+import example.avro.User;
+
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kafka with 
Confluent Schema Registry.
+ * This will read AVRO messages from the input topic, parse them into a 
POJO type via checking the Schema by calling Schema registry.
+ * Then this example publish the POJO type to kafka by converting the POJO 
to AVRO and verifying the schema.
+ * --input-topic test-input --output-topic test-output --bootstrap.servers 
localhost:9092 --zookeeper.connect localhost:2181 --schema-registry-url 
http://localhost:8081 --group.id myconsumer
+ */
+public class TestAvroConsumerConfluent {
+
+   public static void main(String[] args) throws Exception {
+   Properties config = new Properties();
+   // parse input arguments
+   final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+   if (parameterTool.getNumberOfParameters() < 6) {
+   System.out.println("Missing parameters!\n" +
+   "Usage: Kafka --input-topic  
--output-topic  " +
+   "--bootstrap.servers  " +
+   "--zookeeper.connect  " +
+   "--schema-registry-url  --group.id ");
+   return;
+   }
+   config.setProperty("bootstrap.servers", 
parameterTool.getRequired("bootstrap.servers"));
+   config.setProperty("group.id", 
parameterTool.getRequired("group.id"));
+   config.setProperty("zookeeper.connect", 
parameterTool.getRequired("zookeeper.connect"));
+   String schemaRegistryUrl = 
parameterTool.getRequired("schema-registry-url");
+
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().disableSysoutLogging();
+   DataStreamSource input = env
+   .addSource(
+   new FlinkKafkaConsumer010(
+   
parameterTool.getRequired("input-topic"),
+   new 
AvroDeserializationConfluentSchema(User.class, schemaRegistryUrl),
--- End diff --

raw usage of `AvroDeserializationConfluentSchema` better to use 
`AvroDeserializationConfluentSchema<>`. Then we can also remove `User` in 
`FlinkKafkaConsumer010`.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195452831
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
--- End diff --

Could be inherited from the parent.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464823
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
+   
+   
+   tech.allegro.schema.json2avro
+   converter
+   ${json2avro.version}
+   
+   
+
+   
+   
+   
+   build-jar
+   
+   false
+   
+   
+   
+   
+   
src/main/resources/${resource.dir}
+   true
+   
+   
+   
+   
+   
+   
org.apache.maven.plugins
+   
maven-shade-plugin
+   2.4.1
+   
+   
+   
+   
package
+   
+   
shade
+   
+   
+   

+   

+   
org.apache.flink:force-shading
+   
com.google.code.findbugs:jsr305
+   
org.slf4j:*
+   

+   

+   

+   

+   

+   
*:*
+   

+  

[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195452794
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
--- End diff --

Nowhere used


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195471771
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
--- End diff --

Same here with non-serializable fields which should be marked as 
`transient`.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465492
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
--- End diff --

`final` would be better


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195466309
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistyUrl) {
+   this(avroType, schemaRegistyUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
+   }
+
+   public AvroDeserializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity) {
+   this.avroType = avroType;
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   }
+
+   @Override
+   public T deserialize(byte[] message) throws IOException {
+   if (kafkaAvroDecoder == null) {
+   SchemaRegistryClient schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroDecoder = new 
KafkaAvroDecoder(schemaRegistryClient);
+   }
+   if (mapper == null) {
+   this.mapper = new ObjectMapper();
+   }
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+   GenericData.Record record = (GenericData.Record) 
this.kafkaAvroDecoder.fromBytes(message);
+   byte[] messageBytes = jsonAvroConverter.convertToJson(record);
+   return (T) this.mapper.readValue(messageBytes, avroType);
--- End diff --

Why do we have to introduce this indirection with Json? Converting an Avro 
record into Json and then into the specific types seems quite cumbersome. 
Better to directly read the record from the `message`.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195464245
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   flink-avro
+   ${project.version}
+   
+   
+   
+   org.apache.avro
+   avro
+   
+   
+   io.confluent
+   kafka-avro-serializer
+   ${confluent.version}
+   
--- End diff --

Why do we need this dependency? Can't we use Flink's Avro serializer?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195477424
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test-confluent-schema-registry.sh ---
@@ -0,0 +1,106 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
+
+function verify_output {
+  local expected=$(printf $1)
+
+  if [[ "$2" != "$expected" ]]; then
+echo "Output from Flink program does not match expected output."
+echo -e "EXPECTED FOR KEY: --$expected--"
+echo -e "ACTUAL: --$2--"
+PASS=""
+exit 1
+  fi
+}
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_kafka_cluster
+  stop_confluent_schema_registry
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+setup_kafka_dist
+setup_confluent_dist
+
+cd flink-end-to-end-tests/flink-confluent-schema-registry
+mvn clean package -Pbuild-jar -nsu
+
+start_kafka_cluster
+start_confluent_schema_registry
+sleep 5
+
+# modify configuration to use port 8082 for Flink
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+sed -i -e "s/web.port: 8081/web.port: 8082/" 
$FLINK_DIR/conf/flink-conf.yaml
+
+TEST_PROGRAM_JAR=target/flink-confluent-schema-registry-1.6-SNAPSHOT.jar
+

+INPUT_MESSAGE_1='{"name":"Alyssa","favoriteNumber":"250","favoriteColor":"green","eventType":"meeting"}'

+INPUT_MESSAGE_2='{"name":"Charlie","favoriteNumber":"10","favoriteColor":"blue","eventType":"meeting"}'

+INPUT_MESSAGE_3='{"name":"Ben","favoriteNumber":"7","favoriteColor":"red","eventType":"meeting"}'

+USER_SCHEMA='{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string","default":""},{"name":"favoriteNumber","type":"string","default":""},{"name":"favoriteColor","type":"string","default":""},{"name":"eventType","type":{"name":"EventType","type":"enum","symbols":["meeting"]}}]}'
+
+curl -X POST \
+  http://localhost:8081/subjects/users-value/versions \
+  -H 'cache-control: no-cache' \
+  -H 'content-type: application/vnd.schemaregistry.v1+json' \
+  -d '{"schema": "{\"namespace\": \"example.avro\",\"type\": 
\"record\",\"name\": \"User\",\"fields\": [{\"name\": \"name\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteNumber\",  \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"favoriteColor\", \"type\": 
\"string\", \"default\": \"\"},{\"name\": \"eventType\",\"type\": {\"name\": 
\"EventType\",\"type\": \"enum\", \"symbols\": [\"meeting\"] }}]}"}'
--- End diff --

Do we have to manually register the schema or does it also work with 
sending the first record to the input topic?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472438
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
+
+   if (GenericData.get().validate(new 
Schema.Parser().parse(schema), record)) {
+   serializedBytes = 
kafkaAvroSerializer.serialize(topicName, record);
+
+   } else {
+   System.out.println("Error :Invalid message : 
Doesn't follow the avro schema : Message not published to the topic, message = 
" + record.toString());
--- End diff --

No println logging. Better to use proper loggers.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195465338
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroDeserializationConfluentSchema.java
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroDecoder;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema for the Avro type.
+ */
+public class AvroDeserializationConfluentSchema implements 
DeserializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private Class avroType;
+   private final String schemaRegistryUrl;
+   private final int identityMapCapacity;
+   private KafkaAvroDecoder kafkaAvroDecoder;
+
+   private ObjectMapper mapper;
+
+   private JsonAvroConverter jsonAvroConverter;
--- End diff --

`transient`


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195473241
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
--- End diff --

Why do we have to make this schema lookup here? Shouldn't the 
`kafkaAvroSerializer` do the job for us?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195472677
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
+   this(avroType, schemaRegistryUrl, 
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, topicName);
+   }
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, int identityMapCapacity, String topicName) {
+   this.schemaRegistryUrl = schemaRegistryUrl;
+   this.identityMapCapacity = identityMapCapacity;
+   this.topicName = topicName;
+   this.avroType = avroType;
+   }
+
+   @Override
+   public byte[] serialize(T obj) {
+   byte[] serializedBytes = null;
+
+   try {
+   if (kafkaAvroSerializer == null) {
+   this.schemaRegistryClient = new 
CachedSchemaRegistryClient(this.schemaRegistryUrl, this.identityMapCapacity);
+   this.kafkaAvroSerializer = new 
KafkaAvroSerializer(schemaRegistryClient);
+   }
+
+   String schema = 
schemaRegistryClient.getLatestSchemaMetadata(topicName + "-value").getSchema();
+
+   if (jsonAvroConverter == null) {
+   jsonAvroConverter = new JsonAvroConverter();
+   }
+
+   //System.out.println("Schema fetched from Schema 
Registry for topic :" + topicName + " = " + schema);
+   GenericData.Record record = 
jsonAvroConverter.convertToGenericDataRecord(obj.toString().getBytes(), new 
Schema.Parser().parse(schema));
--- End diff --

Why can't we simply pass `obj` to the `kafkaAvroSerializer`?


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195474165
  
--- Diff: 
flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/AvroSerializationConfluentSchema.java
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
+
+/**
+ * The serialization schema for the Avro type.
+ */
+public class AvroSerializationConfluentSchema implements 
SerializationSchema {
+
+   private static final long serialVersionUID = 1L;
+
+   private final String schemaRegistryUrl;
+
+   private final int identityMapCapacity;
+
+   private KafkaAvroSerializer kafkaAvroSerializer;
+
+   private String topicName;
+
+   private SchemaRegistryClient schemaRegistryClient;
+
+   private JsonAvroConverter jsonAvroConverter;
+
+   private final Class avroType;
+
+   public AvroSerializationConfluentSchema(Class avroType, String 
schemaRegistryUrl, String topicName) {
--- End diff --

The serialization format should not need to know about the `topicName` 
which is used here for the schema lookup. Better to use `KafkaAvroEncoder` 
which automatically retrieves the schema name.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195463614
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-connector-kafka-0.10_${scala.binary.version}
+   ${project.version}
+   
+
+   
+   org.apache.flink
+   
flink-connector-kafka-0.8_${scala.binary.version}
+   ${project.version}
+   
--- End diff --

For what do we need this dependency? Ideally, we should drop it.


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-06-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6083#discussion_r195453836
  
--- Diff: flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml ---
@@ -0,0 +1,228 @@
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-end-to-end-tests
+   org.apache.flink
+   1.6-SNAPSHOT
+   
+   4.0.0
+
+   flink-confluent-schema-registry
+   
+   
UTF-8
+   2.11
+   4.1.0
+   0.2.6
+   1.8
+   
+
+   
+   
+   
+   org.apache.flink
+   flink-core
+   ${project.version}
+   
+   
+   
+   org.apache.flink
+   
flink-clients_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-scala_${scala.binary.version}
+   ${project.version}
+   
+   
+   org.apache.flink
+   
flink-streaming-scala_${scala.binary.version}
+   ${project.version}
+   
--- End diff --

I think we can replace the two Scala dependencies above with
```

org.apache.flink

flink-streaming-java_${scala.binary.version}
${project.version}

```


---


[GitHub] flink pull request #6083: [FLINK-8983] End-to-end test: Confluent schema reg...

2018-05-26 Thread medcv
GitHub user medcv opened a pull request:

https://github.com/apache/flink/pull/6083

[FLINK-8983] End-to-end test: Confluent schema registry

## Brief change log

Added an end-to-end test which verifies that Flink is able to work together 
with the Confluent schema registry. In order to do that, this test sets up a 
Kafka cluster and a Flink job which writes and reads from the Confluent schema 
registry producing an Avro type.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/medcv/flink FLINK-8983

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6083


commit 8bcaee1a6d8b32e10888e46e608a1478b4a66e9b
Author: Yadan.JS 
Date:   2018-05-21T02:31:26Z

[FLINK-8983] End-to-end test: Confluent schema registry




---