This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8998b2fe286d4ac788011f03fc6953f8b040b119 Author: Fabian Hueske <fhue...@apache.org> AuthorDate: Tue Aug 27 13:51:44 2019 +0200 [FLINK-12749][docs] Revert commit f695a76b10b0cb5f074bbb874fe374cd11e6eff3 --- docs/fig/click-event-count-example.svg | 21 - docs/fig/flink-docker-playground.svg | 21 - docs/fig/playground-webui-failure.png | Bin 37334 -> 0 bytes docs/fig/playground-webui.png | Bin 18135 -> 0 bytes .../docker-playgrounds/flink_cluster_playground.md | 812 --------------------- .../flink_cluster_playground.zh.md | 774 -------------------- docs/getting-started/docker-playgrounds/index.md | 25 - .../getting-started/docker-playgrounds/index.zh.md | 25 - flink-dist/pom.xml | 7 - flink-dist/src/main/assemblies/bin.xml | 11 - .../pom.xml | 106 --- .../src/main/resources/META-INF/NOTICE | 9 - .../src/main/resources/META-INF/NOTICE | 2 +- flink-examples/flink-examples-build-helper/pom.xml | 1 - flink-examples/flink-examples-streaming/pom.xml | 3 +- .../statemachine/KafkaEventsGeneratorJob.java | 4 +- .../examples/statemachine/StateMachineExample.java | 4 +- .../windowing/clickeventcount/ClickEventCount.java | 117 --- .../clickeventcount/ClickEventGenerator.java | 122 ---- .../functions/ClickEventStatisticsCollector.java | 47 -- .../functions/CountingAggregator.java | 47 -- .../clickeventcount/records/ClickEvent.java | 85 --- .../records/ClickEventDeserializationSchema.java | 51 -- .../records/ClickEventSerializationSchema.java | 55 -- .../records/ClickEventStatistics.java | 116 --- .../ClickEventStatisticsSerializationSchema.java | 55 -- 26 files changed, 7 insertions(+), 2513 deletions(-) diff --git a/docs/fig/click-event-count-example.svg b/docs/fig/click-event-count-example.svg deleted file mode 100644 index 4d9c06f..0000000 --- a/docs/fig/click-event-count-example.svg +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="713px" height="359px" viewBox="-0.5 -0.5 713 359" content="<mxfile modified="2019-07-30T06:33:46.579Z" host="www.draw.io" agent="Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0" etag="Gyms1__7o2-6Tou9Fwcv" version="11.0.7" type="device"><diagram id="axHalsAsTUV6G1jOH0Rx" name="Page- [...] \ No newline at end of file diff --git a/docs/fig/flink-docker-playground.svg b/docs/fig/flink-docker-playground.svg deleted file mode 100644 index 24a53e2..0000000 --- a/docs/fig/flink-docker-playground.svg +++ /dev/null @@ -1,21 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> -<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="681px" height="221px" viewBox="-0.5 -0.5 681 221" content="<mxfile modified="2019-07-30T05:46:19.236Z" host="www.draw.io" agent="Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0" etag="6b7qPJhosj6WVEuTns2y" version="11.0.7" type="device"><diagram id="zIUxMKcIWk6lTGESeTwo" name="Page- [...] \ No newline at end of file diff --git a/docs/fig/playground-webui-failure.png b/docs/fig/playground-webui-failure.png deleted file mode 100644 index 31968dc..0000000 Binary files a/docs/fig/playground-webui-failure.png and /dev/null differ diff --git a/docs/fig/playground-webui.png b/docs/fig/playground-webui.png deleted file mode 100644 index 3833d6d..0000000 Binary files a/docs/fig/playground-webui.png and /dev/null differ diff --git a/docs/getting-started/docker-playgrounds/flink_cluster_playground.md b/docs/getting-started/docker-playgrounds/flink_cluster_playground.md deleted file mode 100644 index 7f6ef23..0000000 --- a/docs/getting-started/docker-playgrounds/flink_cluster_playground.md +++ /dev/null @@ -1,812 +0,0 @@ ---- -title: "Flink Cluster Playground" -nav-title: 'Flink Cluster Playground' -nav-parent_id: docker-playgrounds -nav-pos: 1 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -There are many ways to deploy and operate Apache Flink in various environments. Regardless of this -variety, the fundamental building blocks of a Flink Cluster remain the same, and similar -operational principles apply. - -In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and -monitor an application, experience how Flink recovers from Job failure, and perform everyday -operational tasks like upgrades and rescaling. - -* This will be replaced by the TOC -{:toc} - -## Anatomy of this Playground - -This playground consists of a long living -[Flink Session Cluster]({{ site.baseurl }}/concepts/glossary.html#flink-session-cluster) and a Kafka -Cluster. - -A Flink Cluster always consists of a -[Flink Master]({{ site.baseurl }}/concepts/glossary.html#flink-master) and one or more -[Flink TaskManagers]({{ site.baseurl }}/concepts/glossary.html#flink-taskmanager). The Flink Master -is responsible for handling [Job]({{ site.baseurl }}/concepts/glossary.html#flink-job) submissions, -the supervision of Jobs as well as resource management. The Flink TaskManagers are the worker -processes and are responsible for the execution of the actual -[Tasks]({{ site.baseurl }}/concepts/glossary.html#task) which make up a Flink Job. In this -playground you will start with a single TaskManager, but scale out to more TaskManagers later. -Additionally, this playground comes with a dedicated *client* container, which we use to submit the -Flink Job initially and to perform various operational tasks later on. The *client* container is not -needed by the Flink Cluster itself but only included for ease of use. - -The Kafka Cluster consists of a Zookeeper server and a Kafka Broker. - -<img src="{{ site.baseurl }}/fig/flink-docker-playground.svg" alt="Flink Docker Playground" -class="offset" width="80%" /> - -When the playground is started a Flink Job called *Flink Event Count* will be submitted to the -Flink Master. Additionally, two Kafka Topics *input* and *output* are created. - -<img src="{{ site.baseurl }}/fig/click-event-count-example.svg" alt="Click Event Count Example" -class="offset" width="80%" /> - -The Job consumes `ClickEvent`s from the *input* topic, each with a `timestamp` and a `page`. The -events are then keyed by `page` and counted in 15 second -[windows]({{ site.baseurl }}/dev/stream/operators/windows.html). The results are written to the -*output* topic. - -There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the -output of the Flink job should show 1000 views per page and window. - -{% top %} - -## Starting the Playground - -{% if site.version contains "SNAPSHOT" %} -<p style="border-radius: 5px; padding: 5px" class="bg-danger"> - <b>Note</b>: The Apache Flink Docker images used for this playground are only available for - released versions of Apache Flink. Since you are currently looking at the latest SNAPSHOT - version of the documentation the branch referenced below will not exist. You can either change it - manually or switch to the released version of the documentation via the release picker. -</p> -{% endif %} - -The playground environment is set up in just a few steps. We will walk you through the necessary -commands and show how to validate that everything is running correctly. - -We assume that you have that you have [docker](https://docs.docker.com/) (1.12+) and -[docker-compose](https://docs.docker.com/compose/) (2.1+) installed on your machine. - -The required configuration files are available in the -[flink-playgrounds](https://github.com/apache/flink-playgrounds) repository. Check it out and spin -up the environment: - -{% highlight bash %} -git clone --branch release-{{ site.version }} g...@github.com:apache/flink-playgrounds.git -cd flink-cluster-playground -docker-compose up -d -{% endhighlight %} - -Afterwards, you can inspect the running Docker containers with the following command: - -{% highlight bash %} -docker-compose ps - - Name Command State Ports --------------------------------------------------------------------------------------------------------------------------------- -flink-cluster-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp -flink-cluster-playground_client_1 /docker-entrypoint.sh flin ... Exit 0 -flink-cluster-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp -flink-cluster-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp -flink-cluster-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp -flink-cluster-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp -{% endhighlight %} - -This indicates that the client container has successfully submitted the Flink Job (`Exit 0`) and all -cluster components as well as the data generator are running (`Up`). - -You can stop the playground environment by calling: - -{% highlight bash %} -docker-compose down -v -{% endhighlight %} - -## Entering the Playground - -There are many things you can try and check out in this playground. In the following two sections we -will show you how to interact with the Flink Cluster and demonstrate some of Flink's key features. - -### Flink WebUI - -The most natural starting point to observe your Flink Cluster is the Web UI exposed under -[http://localhost:8081](http://localhost:8081). If everything went well, you'll see that the cluster initially consists of -one TaskManager and executes a Job called *Click Event Count*. - -<img src="{{ site.baseurl }}/fig/playground-webui.png" alt="Playground Flink WebUI" -class="offset" width="100%" /> - -The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and -its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,...). - -### Logs - -**JobManager** - -The JobManager logs can be tailed via `docker-compose`. - -{% highlight bash %} -docker-compose logs -f jobmanager -{% endhighlight %} - -After the initial startup you should mainly see log messages for every checkpoint completion. - -**TaskManager** - -The TaskManager log can be tailed in the same way. -{% highlight bash %} -docker-compose logs -f taskmanager -{% endhighlight %} - -After the initial startup you should mainly see log messages for every checkpoint completion. - -### Flink CLI - -The [Flink CLI]({{ site.baseurl }}/ops/cli.html) can be used from within the client container. For -example, to print the `help` message of the Flink CLI you can run -{% highlight bash%} -docker-compose run --no-deps client flink --help -{% endhighlight %} - -### Flink REST API - -The [Flink REST API]({{ site.baseurl }}/monitoring/rest_api.html#api) is exposed via -`localhost:8081` on the host or via `jobmanager:8081` from the client container, e.g. to list all -currently running jobs, you can run: -{% highlight bash%} -curl localhost:8081/jobs -{% endhighlight %} - -{% if site.version contains "SNAPSHOT" %} -<p style="border-radius: 5px; padding: 5px" class="bg-info"> - <b>Note</b>: If the <i>curl</i> command is not available on your machine, you can run it from the client - container (similar to the Flink CLI): -{% highlight bash%} -docker-compose run --no-deps client curl jobmanager:8081/jobs -{% endhighlight %} -</p> -{% endif %} - -### Kafka Topics - -You can look at the records that are written to the Kafka Topics by running -{% highlight bash%} -//input topic (1000 records/s) -docker-compose exec kafka kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 --topic input - -//output topic (24 records/min) -docker-compose exec kafka kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -{% top %} - -## Time to Play! - -Now that you learned how to interact with Flink and the Docker containers, let's have a look at -some common operational tasks that you can try out on our playground. -All of these tasks are independent of each other, i.e.i you can perform them in any order. -Most tasks can be executed via the [CLI](#flink-cli) and the [REST API](#flink-rest-api). - -### Listing Running Jobs - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink list -{% endhighlight %} -**Expected Output** -{% highlight plain %} -Waiting for response... ------------------- Running/Restarting Jobs ------------------- -16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING) --------------------------------------------------------------- -No scheduled jobs. -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> -**Request** -{% highlight bash %} -curl localhost:8081/jobs -{% endhighlight %} -**Expected Response (pretty-printed)** -{% highlight bash %} -{ - "jobs": [ - { - "id": "<job-id>", - "status": "RUNNING" - } - ] -} -{% endhighlight %} -</div> -</div> - -The JobID is assigned to a Job upon submission and is needed to perform actions on the Job via the -CLI or REST API. - -### Observing Failure & Recovery - -Flink provides exactly-once processing guarantees under (partial) failure. In this playground you -can observe and - to some extent - verify this behavior. - -#### Step 1: Observing the Output - -As described [above](#anatomy-of-this-playground), the events in this playground are generate such -that each window contains exactly one thousand records. So, in order to verify that Flink -successfully recovers from a TaskManager failure without data loss or duplication you can tail the -output topic and check that - after recovery - all windows are present and the count is correct. - -For this, start reading from the *output* topic and leave this command running until after -recovery (Step 3). - -{% highlight bash%} -docker-compose exec kafka kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -#### Step 2: Introducing a Fault - -In order to simulate a partial failure you can kill a TaskManager. In a production setup, this -could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient -exception being thrown from the framework or user code (e.g. due to the temporary unavailability of -an external resource). - -{% highlight bash%} -docker-compose kill taskmanager -{% endhighlight %} - -After a few seconds, Flink will notice the loss of the TaskManager, cancel the affected Job, and -immediately resubmit it for recovery. -When the Job gets restarted, its tasks remain in the `SCHEDULED` state, which is indicated by the -purple colored squares (see screenshot below). - -<img src="{{ site.baseurl }}/fig/playground-webui-failure.png" alt="Playground Flink WebUI" -class="offset" width="100%" /> - -<p style="border-radius: 5px; padding: 5px" class="bg-info"> - <b>Note</b>: Even though the tasks of the job are in SCHEDULED state and not RUNNING yet, the overall - status of a Job is shown as RUNNING. -</p> - -At this point, the tasks of the Job cannot move from the `SCHEDULED` state to `RUNNING` because there -are no resources (TaskSlots provided by TaskManagers) to the run the tasks. -Until a new TaskManager becomes available, the Job will go through a cycle of cancellations and resubmissions. - -In the meantime, the data generator keeps pushing `ClickEvent`s into the *input* topic. This is -similar to a real production setup where data is produced while the Job to process it is down. - -#### Step 3: Recovery - -Once you restart the TaskManager, it reconnects to the Master. - -{% highlight bash%} -docker-compose up -d taskmanager -{% endhighlight %} - -When the Master is notified about the new TaskManager, it schedules the tasks of the -recovering Job to the newly available TaskSlots. Upon restart, the tasks recover their state from -the last successful [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) that was taken -before the failure and switch to the `RUNNING` state. - -The Job will quickly process the full backlog of input events (accumulated during the outage) -from Kafka and produce output at a much higher rate (> 24 records/minute) until it reaches -the head of the stream. In the *output* you will see that all keys (`page`s) are present for all time -windows and that every count is exactly one thousand. Since we are using the -[FlinkKafkaProducer]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance) -in its "at-least-once" mode, there is a chance that you will see some duplicate output records. - -<p style="border-radius: 5px; padding: 5px" class="bg-info"> - <b>Note</b>: Most production setups rely on a resource manager (Kubernetes, Yarn, Mesos) to - automatically restart failed processes. -</p> - -### Upgrading & Rescaling a Job - -Upgrading a Flink Job always involves two steps: First, the Flink Job is gracefully stopped with a -[Savepoint]({{site.base_url}}/ops/state/savepoints.html). A Savepoint is a consistent snapshot of -the complete application state at a well-defined, globally consistent point in time (similar to a -checkpoint). Second, the upgraded Flink Job is started from the Savepoint. In this context "upgrade" -can mean different things including the following: - -* An upgrade to the configuration (incl. the parallelism of the Job) -* An upgrade to the topology of the Job (added/removed Operators) -* An upgrade to the user-defined functions of the Job - -Before starting with the upgrade you might want to start tailing the *output* topic, in order to -observe that no data is lost or corrupted in the course the upgrade. - -{% highlight bash%} -docker-compose exec kafka kafka-console-consumer.sh \ - --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -#### Step 1: Stopping the Job - -To gracefully stop the Job, you need to use the "stop" command of either the CLI or the REST API. -For this you will need the JobID of the Job, which you can obtain by -[listing all running Jobs](#listing-running-jobs) or from the WebUI. With the JobID you can proceed -to stopping the Job: - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink stop <job-id> -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Suspending job "<job-id>" with a savepoint. -Suspended job "<job-id>" with a savepoint. -{% endhighlight %} - -The Savepoint has been stored to the `state.savepoint.dir` configured in the *flink-conf.yaml*, -which is mounted under */tmp/flink-savepoints-directory/* on your local machine. You will need the -path to this Savepoint in the next step. In case of the REST API this path was already part of the -response, you will need to have a look at the filesystem directly. - -**Command** -{% highlight bash %} -ls -lia /tmp/flink-savepoints-directory -{% endhighlight %} - -**Expected Output** -{% highlight bash %} -total 0 - 17 drwxr-xr-x 3 root root 60 17 jul 17:05 . - 2 drwxrwxrwt 135 root root 3420 17 jul 17:09 .. -1002 drwxr-xr-x 2 root root 140 17 jul 17:05 savepoint-<short-job-id>-<uuid> -{% endhighlight %} -</div> - <div data-lang="REST API" markdown="1"> - - **Request** -{% highlight bash %} -# triggering stop -curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}' -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "request-id": "<trigger-id>" -} -{% endhighlight %} - -**Request** -{% highlight bash %} -# check status of stop action and retrieve savepoint path - curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id> -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "status": { - "id": "COMPLETED" - }, - "operation": { - "location": "<savepoint-path>" - } - -{% endhighlight %} -</div> -</div> - -#### Step 2a: Restart Job without Changes - -You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by -restarting it without any changes. - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink run -s <savepoint-path> \ - -d /opt/flink/examples/streaming/ClickEventCount.jar \ - --bootstrap.servers kafka:9092 --checkpointing --event-time -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Starting execution of program -Job has been submitted with JobID <job-id> -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> - -**Request** -{% highlight bash %} -# Uploading the JAR from the Client container -docker-compose run --no-deps client curl -X POST -H "Expect:" \ - -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://jobmanager:8081/jars/upload -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>", - "status": "success" -} - -{% endhighlight %} - -**Request** -{% highlight bash %} -# Submitting the Job -curl -X POST http://localhost:8081/jars/<jar-id>/run \ - -d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}' -{% endhighlight %} -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "jobid": "<job-id>" -} -{% endhighlight %} -</div> -</div> - -Once the Job is `RUNNING` again, you will see in the *output* Topic that records are produced at a -higher rate while the Job is processing the backlog accumulated during the outage. Additionally, -you will see that no data was lost during the upgrade: all windows are present with a count of -exactly one thousand. - -#### Step 2b: Restart Job with a Different Parallelism (Rescaling) - -Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelism -during resubmission. - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \ - -d /opt/flink/examples/streaming/ClickEventCount.jar \ - --bootstrap.servers kafka:9092 --checkpointing --event-time -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Starting execution of program -Job has been submitted with JobID <job-id> -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> - -**Request** -{% highlight bash %} -# Uploading the JAR from the Client container -docker-compose run --no-deps client curl -X POST -H "Expect:" \ - -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://jobmanager:8081/jars/upload -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>", - "status": "success" -} - -{% endhighlight %} - -**Request** -{% highlight bash %} -# Submitting the Job -curl -X POST http://localhost:8081/jars/<jar-id>/run \ - -d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}' -{% endhighlight %} -**Expected Response (pretty-printed** -{% highlight json %} -{ - "jobid": "<job-id>" -} -{% endhighlight %} -</div> -</div> -Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots to -execute it with the increased parallelism (2 available, 3 needed). With -{% highlight bash %} -docker-compose scale taskmanager=2 -{% endhighlight %} -you can add a second TaskManager with two TaskSlots to the Flink Cluster, which will automatically register with the -Flink Master. Shortly after adding the TaskManager the Job should start running again. - -Once the Job is "RUNNING" again, you will see in the *output* Topic that now data was lost during -rescaling: all windows are present with a count of exactly one thousand. - -### Querying the Metrics of a Job - -The Flink Master exposes system and user [metrics]({{ site.baseurl }}/monitoring/metrics.html) -via its REST API. - -The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via -`jobs/<job-id>/metrics`. The actual value of a metric can be queried via the `get` query parameter. - -**Request** -{% highlight bash %} -curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize" -{% endhighlight %} -**Expected Response (pretty-printed; no placeholders)** -{% highlight json %} -[ - { - "id": "lastCheckpointSize", - "value": "9378" - } -] -{% endhighlight %} - -The REST API can not only be used to query metrics, but you can also retrieve detailed information -about the status of a running Job. - -**Request** -{% highlight bash %} -# find the vertex-id of the vertex of interest -curl localhost:8081/jobs/<jod-id> -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "jid": "<job-id>", - "name": "Click Event Count", - "isStoppable": false, - "state": "RUNNING", - "start-time": 1564467066026, - "end-time": -1, - "duration": 374793, - "now": 1564467440819, - "timestamps": { - "CREATED": 1564467066026, - "FINISHED": 0, - "SUSPENDED": 0, - "FAILING": 0, - "CANCELLING": 0, - "CANCELED": 0, - "RECONCILING": 0, - "RUNNING": 1564467066126, - "FAILED": 0, - "RESTARTING": 0 - }, - "vertices": [ - { - "id": "<vertex-id>", - "name": "ClickEvent Source", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066423, - "end-time": -1, - "duration": 374396, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 0, - "read-bytes-complete": true, - "write-bytes": 5033461, - "write-bytes-complete": true, - "read-records": 0, - "read-records-complete": true, - "write-records": 166351, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "Timestamps/Watermarks", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066441, - "end-time": -1, - "duration": 374378, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 5066280, - "read-bytes-complete": true, - "write-bytes": 5033496, - "write-bytes-complete": true, - "read-records": 166349, - "read-records-complete": true, - "write-records": 166349, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "ClickEvent Counter", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066469, - "end-time": -1, - "duration": 374350, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 5085332, - "read-bytes-complete": true, - "write-bytes": 316, - "write-bytes-complete": true, - "read-records": 166305, - "read-records-complete": true, - "write-records": 6, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "ClickEventStatistics Sink", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066476, - "end-time": -1, - "duration": 374343, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 20668, - "read-bytes-complete": true, - "write-bytes": 0, - "write-bytes-complete": true, - "read-records": 6, - "read-records-complete": true, - "write-records": 0, - "write-records-complete": true - } - } - ], - "status-counts": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 4, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "plan": { - "jid": "<job-id>", - "name": "Click Event Count", - "nodes": [ - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEventStatistics Sink", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "FORWARD", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEvent Counter", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "HASH", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "Timestamps/Watermarks", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "FORWARD", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEvent Source", - "optimizer_properties": {} - } - ] - } -} -{% endhighlight %} - -Please consult the [REST API reference](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api) -for a complete list of possible queries including how to query metrics of different scopes (e.g. -TaskManager metrics); - -{% top %} - -## Variants - -You might have noticed that the *Click Event Count* was always started with `--checkpointing` and -`--event-time` program arguments. By omitting these in the command of the *client* container in the -`docker-compose.yaml`, you can change the behavior of the Job. - -* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), -which is Flink's fault-tolerance mechanism. If you run without it and go through -[failure and recovery](#observing-failure--recovery), you should will see that data is actually -lost. - -* `--event-time` enables [event time semantics]({{ site.baseurl }}/dev/event_time.html) for your -Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of -the timestamp of the `ClickEvent`. Consequently, the number of events per window will not be exactly -one thousand anymore. diff --git a/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md b/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md deleted file mode 100644 index b6b299b..0000000 --- a/docs/getting-started/docker-playgrounds/flink_cluster_playground.zh.md +++ /dev/null @@ -1,774 +0,0 @@ ---- -title: "Flink Cluster Playground" -nav-title: 'Flink Cluster Playground' -nav-parent_id: docker-playgrounds -nav-pos: 1 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -There are many ways to deploy and operate Apache Flink in various environments. Regardless of this -variety, the fundamental building blocks of a Flink Cluster remain the same, and similar -operational principles apply. - -In this playground, you will learn how to manage and run Flink Jobs. You will see how to deploy and -monitor an application, experience how Flink recovers from Job failure, and perform everyday -operational tasks like upgrades and rescaling. - -* This will be replaced by the TOC -{:toc} - -## Anatomy of this Playground - -This playground consists of a long living -[Flink Session Cluster]({{ site.baseurl }}/concepts/glossary.html#flink-session-cluster) and a Kafka -Cluster. - -A Flink Cluster always consists of a -[Flink Master]({{ site.baseurl }}/concepts/glossary.html#flink-master) and one or more -[Flink TaskManagers]({{ site.baseurl }}/concepts/glossary.html#flink-taskmanager). The Flink Master -is responsible for handling [Job]({{ site.baseurl }}/concepts/glossary.html#flink-job) submissions, -the supervision of Jobs as well as resource management. The Flink TaskManagers are the worker -processes and are responsible for the execution of the actual -[Tasks]({{ site.baseurl }}/concepts/glossary.html#task) which make up a Flink Job. In this -playground you will start with a single TaskManager, but scale out to more TaskManagers later. -Additionally, this playground comes with a dedicated *client* container, which we use to submit the -Flink Job initially and to perform various operational tasks later on. The *client* container is not -needed by the Flink Cluster itself but only included for ease of use. - -The Kafka Cluster consists of a Zookeeper server and a Kafka Broker. - -<img src="{{ site.baseurl }}/fig/flink-docker-playground.svg" alt="Flink Docker Playground" -class="offset" width="80%" /> - -When the playground is started a Flink Job called *Flink Event Count* will be submitted to the -Flink Master. Additionally, two Kafka Topics *input* and *output* are created. - -<img src="{{ site.baseurl }}/fig/click-event-count-example.svg" alt="Click Event Count Example" -class="offset" width="80%" /> - -The Job consumes `ClickEvent`s from the *input* topic, each with a `timestamp` and a `page`. The -events are then keyed by `page` and counted in 15 second -[windows]({{ site.baseurl }}/dev/stream/operators/windows.html). The results are written to the -*output* topic. - -There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the -output of the Flink job should show 1000 views per page and window. - -{% top %} - -## Starting the Playground - -{% if site.version contains "SNAPSHOT" %} -<p style="border-radius: 5px; padding: 5px" class="bg-danger"> - <b>Note</b>: The Apache Flink Docker images used for this playground are only available for - released versions of Apache Flink. Since you are currently looking at the latest SNAPSHOT - version of the documentation the branch referenced below will not exist. You can either change it - manually or switch to the released version of the documentation via the release picker. -</p> -{% endif %} - -The playground environment is set up in just a few steps. We will walk you through the necessary -commands and show how to validate that everything is running correctly. - -We assume that you have that you have [docker](https://docs.docker.com/) (1.12+) and -[docker-compose](https://docs.docker.com/compose/) (2.1+) installed on your machine. - -The required configuration files are available in the -[flink-playgrounds](https://github.com/apache/flink-playgrounds) repository. Check it out and spin -up the environment: - -{% highlight bash %} -git clone --branch release-{{ site.version }} g...@github.com:apache/flink-playgrounds.git -cd flink-cluster-playground -docker-compose up -d -{% endhighlight %} - -Afterwards, `docker-compose ps` should give you the following output: - -{% highlight bash %} - Name Command State Ports --------------------------------------------------------------------------------------------------------------------------------- -flink-cluster-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp -flink-cluster-playground_client_1 /docker-entrypoint.sh flin ... Exit 0 -flink-cluster-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp -flink-cluster-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp -flink-cluster-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp -flink-cluster-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp -{% endhighlight %} - -This indicates that the client container has successfully submitted the Flink Job ("Exit 0") and all -cluster components as well as the data generator are running ("Up"). - -You can stop the playground environment by calling `docker-compose down -v`. - -## Entering the Playground - -There are many things you can try and check out in this playground. In the following two sections we -will show you how to interact with the Flink Cluster and demonstrate some of Flink's key features. - -### Flink WebUI - -The most natural starting point to observe your Flink Cluster is the Web UI exposed under -http://localhost:8081. If everything went well, you'll see that the cluster initially consists of -one TaskManager and one Job called *Click Event Count* is in "RUNNING" state. - -<img src="{{ site.baseurl }}/fig/playground-webui.png" alt="Playground Flink WebUI" -class="offset" width="100%" /> - -The Flink WebUI contains a lot of useful and interesting information about your Flink Cluster and -its Jobs (JobGraph, Metrics, Checkpointing Statistics, TaskManager Status,...). - -### Logs - -**JobManager** - -The JobManager logs can be tailed via `docker-compose`. - -{% highlight bash %} -docker-compose logs -f jobmanager -{% endhighlight %} - -After the initial startup you should mainly see log messages for every checkpoint completion. - -**TaskManager** - -The TaskManager log can be tailed in the same way. -{% highlight bash %} -docker-compose logs -f taskmanager -{% endhighlight %} - -After the initial startup you should mainly see log messages for every checkpoint completion. - -### Flink CLI - -The [Flink CLI]({{ site.baseurl }}/ops/cli.html) can be used from within the client container. For -example, to print the `help` message of the Flink CLI you can run -{% highlight bash%} -docker-compose run --no-deps client flink --help -{% endhighlight %} - -### Flink REST API - -The [Flink REST API]({{ site.baseurl }}/monitoring/rest_api.html#api) is exposed via -`localhost:8081` on the host or via `jobmanager:8081` from the client container, e.g. to list all -currently running jobs, you can run: -{% highlight bash%} -curl localhost:8081/jobs -{% endhighlight %} - -{% if site.version contains "SNAPSHOT" %} -<p style="border-radius: 5px; padding: 5px" class="bg-info"> - <b>Note</b>: If `curl` is not available on your machine, you can run it from the *client* - container (similar to the Flink CLI): - {% highlight bash%} - docker-compose run --no-deps client curl jobmanager:8081/jobs - {% endhighlight %} -</p> -{% endif %} - -### Kafka Topics - -To manually look at the records in the Kakfa Topics, you can run -{% highlight bash%} -//input topic (1000 records/s) -docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic input -//output topic (24 records/min) -docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -{% top %} - -## Time to Play! - -This section describes some prototypical operational activities in the context of this playground. -They do not need to be executed in any particular order. Most of these tasks can be performed either -via the [CLI](#flink-cli) or the [REST API](#flink-rest-api). - -### Listing Running Jobs - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink list -{% endhighlight %} -**Expected Output** -{% highlight plain %} -Waiting for response... ------------------- Running/Restarting Jobs ------------------- -16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING) --------------------------------------------------------------- -No scheduled jobs. -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> -**Request** -{% highlight bash %} -curl localhost:8081/jobs -{% endhighlight %} -**Expected Response (pretty-printed)** -{% highlight bash %} -{ - "jobs": [ - { - "id": "<job-id>", - "status": "RUNNING" - } - ] -} -{% endhighlight %} -</div> -</div> - -The JobID is assinged to a Job upon submission and is needed to perform actions on the Job via the -CLI or REST API. - -### Observing Failure & Recovery - -Flink provides exactly-once processing guarantees under (partial) failure. In this playground you -can observe and - to some extent - verify this behavior. - -#### Step 1: Observing the Output - -As described [above](#anatomy-of-this-playground), the events in this playground are generate such -that each window contains exactly one thousand records. So, in order to verify that Flink -successfully recovers from a TaskManager failure without data loss or duplication you can tail the -output topic and check that - after recovery - all windows are present and the count is correct. - -For this, start reading from the *output* topic and leave this command running until after -recovery (Step 3). - -{% highlight bash%} -docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -#### Step 2: Introducing a Fault - -In order to simulate a partial failure you can kill a TaskManager. In a production setup, this -could correspond to a loss of the TaskManager process, the TaskManager machine or simply a transient -exception being thrown from the framework or user code (e.g. due to the temporary unavailability of -an external resource). - -{% highlight bash%} -docker-compose kill taskmanager -{% endhighlight %} - -After a few seconds, you will see in the Flink WebUI that the Job failed, and has been -automatically resubmitted. At this point, it can not be restarted though due to the lack of -resources (no TaskSlots provided by TaskManagers) and will go through a cycle of cancellations and -resubmissions until resources become available again. - -<img src="{{ site.baseurl }}/fig/playground-webui-failure.png" alt="Playground Flink WebUI" -class="offset" width="100%" /> - -In the meantime, the data generator keeps pushing `ClickEvent`s into the *input* topic. - -#### Step 3: Recovery - -Once you restart the TaskManager the Job will recover from its last successful -[checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html) prior to the failure. - -{% highlight bash%} -docker-compose up -d taskmanager -{% endhighlight %} - -Once the new TaskManager has registered itself with the Flink Master, the Job will start "RUNNING" -again. It will then quickly process the full backlog of input events (accumulated during the outage) -from Kafka and produce output at a much higher rate (> 24 records/minute) until it has caught up to -the head of the queue. In the *output* you will see that all keys (`page`s) are present for all time -windows and the count is exactly one thousand. Since we are using the -[FlinkKafkaProducer]({{ site.baseurl }}/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance) -in its "at-least-once" mode, there is a chance that you will see some output records twice. - -### Upgrading & Rescaling the Job - -Upgrading a Flink Job always involves two steps: First, the Flink Job is gracefully stopped with a -[Savepoint]({{site.base_url}}/ops/state/savepoints.html). A Savepoint is a consistent snapshot of -the complete application state at a well-defined, globally consistent point in time (similar to a -checkpoint). Second, the upgraded Flink Job is started from the Savepoint. In this context "upgrade" -can mean different things including the following: - -* An upgrade to the configuration (incl. the parallelism of the Job) -* An upgrade to the topology of the Job (added/removed Operators) -* An upgrade to the user-defined functions of the Job - -Before starting with the upgrade you might want to start tailing the *output* topic, in order to -observe that no data is lost or corrupted in the course the upgrade. - -{% highlight bash%} -docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output -{% endhighlight %} - -#### Step 1: Stopping the Job - -To gracefully stop the Job, you need to use the "stop" command of either the CLI or the REST API. -For this you will need the JobID of the Job, which you can obtain by -[listing all running Jobs](#listing-running-jobs) or from the WebUI. With the JobID you can proceed -to stopping the Job: - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink stop <job-id> -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Suspending job "<job-id>" with a savepoint. -Suspended job "<job-id>" with a savepoint. -{% endhighlight %} -</div> - <div data-lang="REST API" markdown="1"> - - **Request** -{% highlight bash %} -# triggering stop -curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}' -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "request-id": "<trigger-id>" -} -{% endhighlight %} - -**Request** -{% highlight bash %} -# check status of stop action - curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id> -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "status": { - "id": "COMPLETED" - }, - "operation": { - "location": "<savepoint-path>" - } - -{% endhighlight %} -</div> -</div> - -The Savepoint has been stored to the `state.savepoint.dir` configured in the *flink-conf.yaml*, -which is mounted under */tmp/flink-savepoints-directory/* on your local machine. You will need the -path to this Savepoint in the next step. In case of the REST API this path was already part of the -response, you will need to have a look at the filesystem directly. - -**Command** -{% highlight bash %} -ls -lia /tmp/flink-savepoints-directory -{% endhighlight %} - -**Expected Output** -{% highlight bash %} -total 0 - 17 drwxr-xr-x 3 root root 60 17 jul 17:05 . - 2 drwxrwxrwt 135 root root 3420 17 jul 17:09 .. -1002 drwxr-xr-x 2 root root 140 17 jul 17:05 savepoint-<short-job-id>-<uuid> -{% endhighlight %} - -#### Step 2a: Restart Job without Changes - -You can now restart the upgraded Job from this Savepoint. For simplicity, you can start by -restarting it without any changes. - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink run -s <savepoint-path> -d /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Starting execution of program -Job has been submitted with JobID <job-id> -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> - -**Request** -{% highlight bash %} -# Uploading the JAR -curl -X POST -H "Expect:" -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://localhost:8081/jars/upload -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>", - "status": "success" -} - -{% endhighlight %} - -**Request** -{% highlight bash %} -# Submitting the Job -curl -X POST http://localhost:8081/jars/<jar-id>/run -d {"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"} -{% endhighlight %} -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "jobid": "<job-id>" -} -{% endhighlight %} -</div> -</div> - -Once the Job is "RUNNING" again, you will see in the *output* Topic that records are produced at a -higher rate while the Job is processing the backlog accumulated during the outage. Additionally, -you will see that no data was lost during the upgrade: all windows are present with a count of -exactly one thousand. - -#### Step 2b: Restart Job with a Different Parallelism (Rescaling) - -Alternatively, you could also rescale the Job from this Savepoint by passing a different parallelism -during resubmission. - -<div class="codetabs" markdown="1"> -<div data-lang="CLI" markdown="1"> -**Command** -{% highlight bash %} -docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> -d /opt/flink/examples/streaming/ClickEventCount.jar --bootstrap.servers kafka:9092 --checkpointing --event-time -{% endhighlight %} -**Expected Output** -{% highlight bash %} -Starting execution of program -Job has been submitted with JobID <job-id> -{% endhighlight %} -</div> -<div data-lang="REST API" markdown="1"> - -**Request** -{% highlight bash %} -# Uploading the JAR -curl -X POST -H "Expect:" -F "jarfile=@/opt/flink/examples/streaming/ClickEventCount.jar" http://localhost:8081/jars/upload -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>", - "status": "success" -} - -{% endhighlight %} - -**Request** -{% highlight bash %} -# Submitting the Job -curl -X POST http://localhost:8081/jars/<jar-id>/run -d {"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"} -{% endhighlight %} -**Expected Response (pretty-printed** -{% highlight json %} -{ - "jobid": "<job-id>" -} -{% endhighlight %} -</div> -</div> -Now, the Job has been resubmitted, but it will not start as there are not enough TaskSlots to -execute it with the increased parallelism (1 available, 3 needed). With -{% highlight bash %} -docker-compose scale taskmanager=2 -{% endhighlight %} -you can add a second TaskManager to the Flink Cluster, which will automatically register with the -Flink Master. Shortly after adding the TaskManager the Job should start running again. - -Once the Job is "RUNNING" again, you will see in the *output* Topic that now data was lost during -rescaling: all windows are present with a count of exactly one thousand. - -### Querying the Metrics of a Job - -The Flink Master exposes system and user [metrics]({{ site.baseurl }}/monitoring/metrics.html) -via its REST API. - -The endpoint depends on the scope of these metrics. Metrics scoped to a Job can be listed via -`jobs/<job-id>/metrics`. The actual value of a metric can be queried via the `get` query parameter. - -**Request** -{% highlight bash %} -curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize" -{% endhighlight %} -**Expected Response (pretty-printed; no placeholders)** -{% highlight json %} -[ - { - "id": "lastCheckpointSize", - "value": "9378" - } -] -{% endhighlight %} - -The REST API can not only be used to query metrics, but you can also retrieve detailed information -about the status of a running Job. - -**Request** -{% highlight bash %} -# find the vertex-id of the vertex of interest -curl localhost:8081/jobs/<jod-id> -{% endhighlight %} - -**Expected Response (pretty-printed)** -{% highlight json %} -{ - "jid": "<job-id>", - "name": "Click Event Count", - "isStoppable": false, - "state": "RUNNING", - "start-time": 1564467066026, - "end-time": -1, - "duration": 374793, - "now": 1564467440819, - "timestamps": { - "CREATED": 1564467066026, - "FINISHED": 0, - "SUSPENDED": 0, - "FAILING": 0, - "CANCELLING": 0, - "CANCELED": 0, - "RECONCILING": 0, - "RUNNING": 1564467066126, - "FAILED": 0, - "RESTARTING": 0 - }, - "vertices": [ - { - "id": "<vertex-id>", - "name": "ClickEvent Source", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066423, - "end-time": -1, - "duration": 374396, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 0, - "read-bytes-complete": true, - "write-bytes": 5033461, - "write-bytes-complete": true, - "read-records": 0, - "read-records-complete": true, - "write-records": 166351, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "Timestamps/Watermarks", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066441, - "end-time": -1, - "duration": 374378, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 5066280, - "read-bytes-complete": true, - "write-bytes": 5033496, - "write-bytes-complete": true, - "read-records": 166349, - "read-records-complete": true, - "write-records": 166349, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "ClickEvent Counter", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066469, - "end-time": -1, - "duration": 374350, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 5085332, - "read-bytes-complete": true, - "write-bytes": 316, - "write-bytes-complete": true, - "read-records": 166305, - "read-records-complete": true, - "write-records": 6, - "write-records-complete": true - } - }, - { - "id": "<vertex-id>", - "name": "ClickEventStatistics Sink", - "parallelism": 2, - "status": "RUNNING", - "start-time": 1564467066476, - "end-time": -1, - "duration": 374343, - "tasks": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 2, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "metrics": { - "read-bytes": 20668, - "read-bytes-complete": true, - "write-bytes": 0, - "write-bytes-complete": true, - "read-records": 6, - "read-records-complete": true, - "write-records": 0, - "write-records-complete": true - } - } - ], - "status-counts": { - "CREATED": 0, - "FINISHED": 0, - "DEPLOYING": 0, - "RUNNING": 4, - "CANCELING": 0, - "FAILED": 0, - "CANCELED": 0, - "RECONCILING": 0, - "SCHEDULED": 0 - }, - "plan": { - "jid": "<job-id>", - "name": "Click Event Count", - "nodes": [ - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEventStatistics Sink", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "FORWARD", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEvent Counter", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "HASH", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "Timestamps/Watermarks", - "inputs": [ - { - "num": 0, - "id": "<vertex-id>", - "ship_strategy": "FORWARD", - "exchange": "pipelined_bounded" - } - ], - "optimizer_properties": {} - }, - { - "id": "<vertex-id>", - "parallelism": 2, - "operator": "", - "operator_strategy": "", - "description": "ClickEvent Source", - "optimizer_properties": {} - } - ] - } -} -{% endhighlight %} - -Please consult the [REST API reference](https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api) -for a complete list of possible queries including how to query metrics of different scopes (e.g. -TaskManager metrics); - -{% top %} - -## Variants - -You might have noticed that the *Click Event Count* was always started with `--checkpointing` and -`--event-time` program arguments. By omitting these in the command of the *client* container in the -`docker-compose.yaml`, you can change the behavior of the Job. - -* `--checkpointing` enables [checkpoint]({{ site.baseurl }}/internals/stream_checkpointing.html), -which is Flink's fault-tolerance mechanism. If you run without it and go through -[failure and recovery](#observing-failure--recovery), you should will see that data is actually -lost. - -* `--event-time` enables [event time semantics]({{ site.baseurl }}/dev/event_time.html) for your -Job. When disabled, the Job will assign events to windows based on the wall-clock time instead of -the timestamp of the `ClickEvent`. Consequently, the number of events per window will not be exactly -one thousand anymore. diff --git a/docs/getting-started/docker-playgrounds/index.md b/docs/getting-started/docker-playgrounds/index.md deleted file mode 100644 index 2051e46..0000000 --- a/docs/getting-started/docker-playgrounds/index.md +++ /dev/null @@ -1,25 +0,0 @@ ---- -title: Docker Playgrounds -nav-id: docker-playgrounds -nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds' -nav-parent_id: getting-started -nav-pos: 3 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> diff --git a/docs/getting-started/docker-playgrounds/index.zh.md b/docs/getting-started/docker-playgrounds/index.zh.md deleted file mode 100644 index 2051e46..0000000 --- a/docs/getting-started/docker-playgrounds/index.zh.md +++ /dev/null @@ -1,25 +0,0 @@ ---- -title: Docker Playgrounds -nav-id: docker-playgrounds -nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds' -nav-parent_id: getting-started -nav-pos: 3 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index acec56d..5a27808 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -193,13 +193,6 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.apache.flink</groupId> <artifactId>flink-examples-streaming-twitter_${scala.binary.version}</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml index a5af820..1b65486 100644 --- a/flink-dist/src/main/assemblies/bin.xml +++ b/flink-dist/src/main/assemblies/bin.xml @@ -239,17 +239,6 @@ under the License. <exclude>original-*.jar</exclude> </excludes> </fileSet> - <fileSet> - <directory>../flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/target</directory> - <outputDirectory>examples/streaming</outputDirectory> - <fileMode>0644</fileMode> - <includes> - <include>*.jar</include> - </includes> - <excludes> - <exclude>original-*.jar</exclude> - </excludes> - </fileSet> <!-- copy jar files of the gelly examples --> <fileSet> diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml deleted file mode 100644 index c2c23eb..0000000 --- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/pom.xml +++ /dev/null @@ -1,106 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>flink-examples-build-helper</artifactId> - <groupId>org.apache.flink</groupId> - <version>1.10-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId> - <name>flink-examples-streaming-click-event-count</name> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-examples-streaming_${scala.binary.version}</artifactId> - <version>${project.version}</version> - </dependency> - </dependencies> - - <build> - <finalName>ClickEventCount</finalName> - - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <executions> - <execution> - <id>shade-flink</id> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <shadeTestJar>false</shadeTestJar> - <shadedArtifactAttached>false</shadedArtifactAttached> - <createDependencyReducedPom>false</createDependencyReducedPom> - <transformers> - <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> - <mainClass>org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount</mainClass> - </transformer> - </transformers> - <artifactSet> - <includes> - <include>org.apache.flink:flink-connector-kafka*</include> - <include>org.apache.flink:flink-examples-streaming*</include> - <include>org.apache.kafka:*</include> - </includes> - </artifactSet> - - <filters> - <filter> - <artifact>org.apache.flink:flink-examples-streaming_*</artifact> - <includes> - <include>org/apache/flink/streaming/examples/windowing/clickeventcount/**</include> - </includes> - </filter> - <filter> - <artifact>org.apache.kafka:*</artifact> - <excludes> - <exclude>LICENSE</exclude> - <!-- Does not contain anything relevant. - Cites a binary dependency on jersey, but this is neither reflected in the - dependency graph, nor are any jersey files bundled. --> - <exclude>NOTICE</exclude> - </excludes> - </filter> - </filters> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE b/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE deleted file mode 100644 index 137e0c6..0000000 --- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,9 +0,0 @@ -flink-examples-streaming-click-event-count -Copyright 2014-2019 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) - -- org.apache.kafka:kafka-clients:2.2.0 diff --git a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE b/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE index 9161b8b..af71222 100644 --- a/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE +++ b/flink-examples/flink-examples-build-helper/flink-examples-streaming-state-machine/src/main/resources/META-INF/NOTICE @@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.kafka:kafka-clients:2.2.0 +- org.apache.kafka:kafka-clients:0.10.2.1 diff --git a/flink-examples/flink-examples-build-helper/pom.xml b/flink-examples/flink-examples-build-helper/pom.xml index 6684ef4..6dd769d 100644 --- a/flink-examples/flink-examples-build-helper/pom.xml +++ b/flink-examples/flink-examples-build-helper/pom.xml @@ -36,7 +36,6 @@ under the License. <module>flink-examples-streaming-twitter</module> <module>flink-examples-streaming-state-machine</module> <module>flink-examples-streaming-gcp-pubsub</module> - <module>flink-examples-streaming-click-event-count</module> </modules> </project> diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index fe96e9a..d96df1f 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -58,7 +58,7 @@ under the License. <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> + <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> @@ -364,6 +364,7 @@ under the License. </includes> </configuration> </execution> + </executions> </plugin> diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java index aa030a7..059b2c0 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java @@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.statemachine; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource; import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer; @@ -46,7 +46,7 @@ public class KafkaEventsGeneratorJob { env .addSource(new EventsGeneratorSource(errorRate, sleep)) - .addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer())); + .addSink(new FlinkKafkaProducer010<>(brokers, kafkaTopic, new EventDeSerializer())); // trigger program execution env.execute("State machine example Kafka events generator job"); diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java index 00e2b0e..054ed0a 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.examples.statemachine.dfa.State; import org.apache.flink.streaming.examples.statemachine.event.Alert; import org.apache.flink.streaming.examples.statemachine.event.Event; @@ -82,7 +82,7 @@ public class StateMachineExample { Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", brokers); - FlinkKafkaConsumer<Event> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps); + FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>(kafkaTopic, new EventDeSerializer(), kafkaProps); kafka.setStartFromLatest(); kafka.setCommitOffsetsOnCheckpoints(false); source = kafka; diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java deleted file mode 100644 index 2c02292..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventCount.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; -import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; -import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.ClickEventStatisticsCollector; -import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.CountingAggregator; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventDeserializationSchema; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatisticsSerializationSchema; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; - -import java.util.Properties; -import java.util.concurrent.TimeUnit; - -/** - * A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and - * writing the resulting {@link ClickEventStatistics} back to Kafka. - * - * <p> It can be run with or without checkpointing and with event time or processing time semantics. - * </p> - * - * <p>The Job can be configured via the command line:</p> - * * "--checkpointing": enables checkpointing - * * "--event-time": set the StreamTimeCharacteristic to EventTime - * * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from - * * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to - * * "--bootstrap.servers": comma-separated list of Kafka brokers - * - */ -public class ClickEventCount { - - public static final String CHECKPOINTING_OPTION = "checkpointing"; - public static final String EVENT_TIME_OPTION = "event-time"; - - public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS); - - public static void main(String[] args) throws Exception { - final ParameterTool params = ParameterTool.fromArgs(args); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - configureEnvironment(params, env); - - String inputTopic = params.get("input-topic", "input"); - String outputTopic = params.get("output-topic", "output"); - String brokers = params.get("bootstrap.servers", "localhost:9092"); - Properties kafkaProps = new Properties(); - kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count"); - - env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps)) - .name("ClickEvent Source") - .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) { - @Override - public long extractTimestamp(final ClickEvent element) { - return element.getTimestamp().getTime(); - } - }) - .keyBy(ClickEvent::getPage) - .timeWindow(WINDOW_SIZE) - .aggregate(new CountingAggregator(), - new ClickEventStatisticsCollector()) - .name("ClickEvent Counter") - .addSink(new FlinkKafkaProducer<>( - outputTopic, - new ClickEventStatisticsSerializationSchema(outputTopic), - kafkaProps, - FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)) - .name("ClickEventStatistics Sink"); - - env.execute("Click Event Count"); - } - - private static void configureEnvironment( - final ParameterTool params, - final StreamExecutionEnvironment env) { - - boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION); - boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION); - - if (checkpointingEnabled) { - env.enableCheckpointing(1000); - } - - if (eventTimeSemantics) { - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - } - - //disabling Operator chaining to make it easier to follow the Job in the WebUI - env.disableOperatorChaining(); - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java deleted file mode 100644 index 17943bf..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/ClickEventGenerator.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount; - -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventSerializationSchema; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount.WINDOW_SIZE; - -/** - * A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and - * `--bootstrap.servers`. - * - * <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between - * events is chosen such that processing time and event time roughly align. The generator always - * creates the same sequence of events. </p> - * - */ -public class ClickEventGenerator { - - public static final int EVENTS_PER_WINDOW = 1000; - - private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news"); - - //this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the - //window size - public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW; - - public static void main(String[] args) throws Exception { - - final ParameterTool params = ParameterTool.fromArgs(args); - - String topic = params.get("topic", "input"); - - Properties kafkaProps = createKafkaProperties(params); - - KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps); - - ClickIterator clickIterator = new ClickIterator(); - - while (true) { - - ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize( - clickIterator.next(), - null); - - producer.send(record); - - Thread.sleep(DELAY); - } - } - - private static Properties createKafkaProperties(final ParameterTool params) { - String brokers = params.get("bootstrap.servers", "localhost:9092"); - Properties kafkaProps = new Properties(); - kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); - kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName()); - return kafkaProps; - } - - static class ClickIterator { - - private Map<String, Long> nextTimestampPerKey; - private int nextPageIndex; - - ClickIterator() { - nextTimestampPerKey = new HashMap<>(); - nextPageIndex = 0; - } - - ClickEvent next() { - String page = nextPage(); - return new ClickEvent(nextTimestamp(page), page); - } - - private Date nextTimestamp(String page) { - long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L); - nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW); - return new Date(nextTimestamp); - } - - private String nextPage() { - String nextPage = pages.get(nextPageIndex); - if (nextPageIndex == pages.size() - 1) { - nextPageIndex = 0; - } else { - nextPageIndex++; - } - return nextPage; - } - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java deleted file mode 100644 index 76af954..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/ClickEventStatisticsCollector.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.functions; - -import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics; -import org.apache.flink.util.Collector; - -import java.util.Date; - -/** - * A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an - * instance of {@link ClickEventStatistics}. - * - **/ -public class ClickEventStatisticsCollector - extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> { - - @Override - public void process( - final String page, - final Context context, - final Iterable<Long> elements, - final Collector<ClickEventStatistics> out) throws Exception { - - Long count = elements.iterator().next(); - - out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count)); - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java deleted file mode 100644 index 86f8190..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/functions/CountingAggregator.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.functions; - -import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent; - -/** - * An {@link AggregateFunction} which simply counts {@link ClickEvent}s. - * - */ -public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> { - @Override - public Long createAccumulator() { - return 0L; - } - - @Override - public Long add(final ClickEvent value, final Long accumulator) { - return accumulator + 1; - } - - @Override - public Long getResult(final Long accumulator) { - return accumulator; - } - - @Override - public Long merge(final Long a, final Long b) { - return a + b; - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java deleted file mode 100644 index 47e0882..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEvent.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.records; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat; - -import java.util.Date; -import java.util.Objects; - -/** - * A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}. - * - */ -public class ClickEvent { - - //using java.util.Date for better readability in Flink Cluster Playground - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") - private Date timestamp; - private String page; - - public ClickEvent() { - } - - public ClickEvent(final Date timestamp, final String page) { - this.timestamp = timestamp; - this.page = page; - } - - public Date getTimestamp() { - return timestamp; - } - - public void setTimestamp(final Date timestamp) { - this.timestamp = timestamp; - } - - public String getPage() { - return page; - } - - public void setPage(final String page) { - this.page = page; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final ClickEvent that = (ClickEvent) o; - return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page); - } - - @Override - public int hashCode() { - return Objects.hash(timestamp, page); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("ClickEvent{"); - sb.append("timestamp=").append(timestamp); - sb.append(", page='").append(page).append('\''); - sb.append('}'); - return sb.toString(); - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java deleted file mode 100644 index 8da3ad1..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventDeserializationSchema.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.records; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -/** - * A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON. - * - */ -public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> { - - private static final long serialVersionUID = 1L; - - private static final ObjectMapper objectMapper = new ObjectMapper(); - - @Override - public ClickEvent deserialize(byte[] message) throws IOException { - return objectMapper.readValue(message, ClickEvent.class); - } - - @Override - public boolean isEndOfStream(ClickEvent nextElement) { - return false; - } - - @Override - public TypeInformation<ClickEvent> getProducedType() { - return TypeInformation.of(ClickEvent.class); - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java deleted file mode 100644 index fda0d05..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventSerializationSchema.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.records; - -import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.Nullable; - -/** - * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON. - * - */ -public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> { - - private static final ObjectMapper objectMapper = new ObjectMapper(); - private String topic; - - public ClickEventSerializationSchema(){ - } - - public ClickEventSerializationSchema(String topic) { - this.topic = topic; - } - - @Override - public ProducerRecord<byte[], byte[]> serialize( - final ClickEvent message, @Nullable final Long timestamp) { - try { - //if topic is null, default topic will be used - return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("Could not serialize record: " + message, e); - } - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java deleted file mode 100644 index ade3911..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatistics.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.records; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat; - -import java.util.Date; -import java.util.Objects; - -/** - * A small wrapper class for windowed page counts. - * - */ -public class ClickEventStatistics { - - //using java.util.Date for better readability in Flink Cluster Playground - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") - private Date windowStart; - //using java.util.Date for better readability in Flink Cluster Playground - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS") - private Date windowEnd; - private String page; - private long count; - - public ClickEventStatistics() { - } - - public ClickEventStatistics( - final Date windowStart, - final Date windowEnd, - final String page, - final long count) { - this.windowStart = windowStart; - this.windowEnd = windowEnd; - this.page = page; - this.count = count; - } - - public Date getWindowStart() { - return windowStart; - } - - public void setWindowStart(final Date windowStart) { - this.windowStart = windowStart; - } - - public Date getWindowEnd() { - return windowEnd; - } - - public void setWindowEnd(final Date windowEnd) { - this.windowEnd = windowEnd; - } - - public String getPage() { - return page; - } - - public void setPage(final String page) { - this.page = page; - } - - public long getCount() { - return count; - } - - public void setCount(final long count) { - this.count = count; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final ClickEventStatistics that = (ClickEventStatistics) o; - return count == that.count && - Objects.equals(windowStart, that.windowStart) && - Objects.equals(windowEnd, that.windowEnd) && - Objects.equals(page, that.page); - } - - @Override - public int hashCode() { - return Objects.hash(windowStart, windowEnd, page, count); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder("ClickEventStatistics{"); - sb.append("windowStart=").append(windowStart); - sb.append(", windowEnd=").append(windowEnd); - sb.append(", page='").append(page).append('\''); - sb.append(", count=").append(count); - sb.append('}'); - return sb.toString(); - } -} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java deleted file mode 100644 index 897691f..0000000 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/clickeventcount/records/ClickEventStatisticsSerializationSchema.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.examples.windowing.clickeventcount.records; - -import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.kafka.clients.producer.ProducerRecord; - -import javax.annotation.Nullable; - -/** - * A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON. - * - */ -public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> { - - private static final ObjectMapper objectMapper = new ObjectMapper(); - private String topic; - - public ClickEventStatisticsSerializationSchema(){ - } - - public ClickEventStatisticsSerializationSchema(String topic) { - this.topic = topic; - } - - @Override - public ProducerRecord<byte[], byte[]> serialize( - final ClickEventStatistics message, @Nullable final Long timestamp) { - try { - //if topic is null, default topic will be used - return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message)); - } catch (JsonProcessingException e) { - throw new IllegalArgumentException("Could not serialize record: " + message, e); - } - } -}