This is an automated email from the ASF dual-hosted git repository. felixcheung pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 13fd50e [ZEPPELIN-3914] upgrade Flink to 1.7.1 (#3266) 13fd50e is described below commit 13fd50ee535cd28b237cb4af740d0f893dbff3ae Author: Xue Yu <278006...@qq.com> AuthorDate: Tue Jan 1 09:22:42 2019 +0800 [ZEPPELIN-3914] upgrade Flink to 1.7.1 (#3266) ### What is this PR for? This PR is for upgrading current Flink to 1.7.1 ### What type of PR is it? [Improvement] ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3914 ### How should this be tested? *manual ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no --- docs/interpreter/flink.md | 2 +- flink/pom.xml | 2 +- .../apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala | 2 +- .../org/apache/zeppelin/flink/FlinkScalaInterpreter.scala | 13 ++++--------- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/docs/interpreter/flink.md b/docs/interpreter/flink.md index 2cf3125..d3f2223 100644 --- a/docs/interpreter/flink.md +++ b/docs/interpreter/flink.md @@ -50,7 +50,7 @@ At the "Interpreters" menu, you have to create a new Flink interpreter and provi </tr> </table> -For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html). +For more information about Flink configuration, you can find it [here](https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/config.html). ## How to test it's working You can find an example of Flink usage in the Zeppelin Tutorial folder or try the following word count example, by using the [Zeppelin notebook](https://www.zeppelinhub.com/viewer/notebooks/aHR0cHM6Ly9yYXcuZ2l0aHVidXNlcmNvbnRlbnQuY29tL05GTGFicy96ZXBwZWxpbi1ub3RlYm9va3MvbWFzdGVyL25vdGVib29rcy8yQVFFREs1UEMvbm90ZS5qc29u) from Till Rohrmann's presentation [Interactive data analysis with Apache Flink](http://www.slideshare.net/tillrohrmann/data-analysis-49806564) for Apache Flink Meetup. diff --git a/flink/pom.xml b/flink/pom.xml index 7a374f2..331e19c 100644 --- a/flink/pom.xml +++ b/flink/pom.xml @@ -36,7 +36,7 @@ <properties> <!--library versions--> <interpreter.name>flink</interpreter.name> - <flink.version>1.5.2</flink.version> + <flink.version>1.7.1</flink.version> <flink.akka.version>2.3.7</flink.akka.version> <scala.macros.version>2.0.1</scala.macros.version> <scala.binary.version>2.11</scala.binary.version> diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala index 1694a44..b2d8d16 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkSQLScalaInterpreter.scala @@ -30,7 +30,7 @@ class FlinkSQLScalaInterpreter(scalaInterpreter: FlinkScalaInterpreter, def interpret(code: String, context: InterpreterContext): InterpreterResult = { try { - val table: Table = this.btenv.sql(code) + val table: Table = this.btenv.sqlQuery(code) val result = z.showData(table) return new InterpreterResult(InterpreterResult.Code.SUCCESS, result) } catch { diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index 14f8959..1d8b27e 100644 --- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -26,7 +26,7 @@ import org.apache.flink.api.scala.FlinkShell._ import org.apache.flink.api.scala.{ExecutionEnvironment, FlinkILoop} import org.apache.flink.client.program.ClusterClient import org.apache.flink.configuration.GlobalConfiguration -import org.apache.flink.runtime.minicluster.{MiniCluster, StandaloneMiniCluster} +import org.apache.flink.runtime.minicluster.MiniCluster import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment} @@ -45,8 +45,7 @@ class FlinkScalaInterpreter(val properties: Properties) { lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) private var flinkILoop: FlinkILoop = _ - private var cluster: Option[Either[Either[StandaloneMiniCluster, MiniCluster], - ClusterClient[_]]] = _ + private var cluster: Option[Either[MiniCluster, ClusterClient[_]]] = _ private var scalaCompleter: ScalaCompleter = _ private val interpreterOutput = new InterpreterOutputStream(LOGGER) @@ -68,8 +67,7 @@ class FlinkScalaInterpreter(val properties: Properties) { val (iLoop, cluster) = try { val (host, port, cluster) = fetchConnectionInfo(configuration, config) val conf = cluster match { - case Some(Left(Left(miniCluster))) => miniCluster.getConfiguration - case Some(Left(Right(_))) => configuration + case Some(Left(miniCluster)) => configuration case Some(Right(yarnCluster)) => yarnCluster.getFlinkConfiguration case None => configuration } @@ -213,10 +211,7 @@ class FlinkScalaInterpreter(val properties: Properties) { } if (cluster != null) { cluster match { - case Some(Left(Left(legacyMiniCluster))) => - LOGGER.info("Shutdown LegacyMiniCluster") - legacyMiniCluster.close() - case Some(Left(Right(newMiniCluster))) => + case Some(Left(newMiniCluster)) => LOGGER.info("Shutdown NewMiniCluster") newMiniCluster.close() case Some(Right(yarnCluster)) =>