I'm not too familiar with what's happening here, but maybe Klou (cc'd) can help?
On Thu, Mar 30, 2017 at 6:50 PM, Andrea Spina <andrea.sp...@radicalbit.io> wrote: > Dear Flink community, > > I started to use Async Functions in Scala, Flink 1.2.0, in order to retrieve > enriching information from MariaDB database. In order to do that, I firstly > employed classical jdbc library (org.mariadb.jdbc) and it worked has > expected. > > Due to the blocking behavior of jdbc, I'm trying to use this library > https://github.com/mauricio/postgresql-async/tree/master/mysql-async > which promises to offer a subset of features in a non-blocking fashion. > > Sadly I'm not able to use it. > > Following the async function code. > > * > object AsyncEnricher { > case class OutputType(field1: FieldType, field2: FieldType) > } > > class AsyncEnricher(configuration: MariaDBConfig) > extends AsyncFunction[InputType, OutputType] > with Serializable > with AutoCloseable > with LazyLogging { > > private val queryString = s"SELECT <column> FROM [table] WHERE > <column_name> = <value>;" > > implicit lazy val executor = > ExecutionContext.fromExecutor(Executors.directExecutor()) > > private lazy val mariaDBClient: Connection = { > val config = createConfiguration(configuration) > val connection = new MySQLConnection(config) > Await.result(connection.connect, 5 seconds) > } > > override def asyncInvoke(input: InputType, collector: > AsyncCollector[OutputType]): Unit = { > > val queryResult = mariaDBClient.sendPreparedStatement(queryString, > Seq(input.fieldToSearch)) > > queryResult.map(_.rows) onSuccess { > case Some(resultSet) => > Try { > resultSet.head(0).asInstanceOf[FieldType] > } match { > case Success(value) => > collector.collect(Iterable(OutputType(value, value))) > case Failure(e) => > logger.error(s"retrieving value from MariaDB raised $e: > $queryString executed") > } > case _ => logger.error(s"value not found: $queryString executed") > } > > queryResult onFailure { > case e: Throwable => > logger.error(s"retrieving location volume from MariaDB raised $e: > $queryString executed") > } > > } > > override def close(): Unit = { > Try(mariaDBClient.disconnect).recover { > case t: Throwable => logger.info(s"MariaDB cannot be closed - > ${t.getMessage}") > } > } > > } > * > > Follows the stack > > / > TimerException{java.lang.IllegalStateException: Timer service is shut down} > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:220) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Timer service is shut down > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.registerTimer(SystemProcessingTimeService.java:118) > at > org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.onProcessingTime(TimestampsAndPeriodicWatermarksOperator.java:82) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:218) > ... 7 more > > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:343) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:320) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:442) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:343) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > / > > I think it's involving connection.connect returning object which is a Future > and so the Await. This is different than jdbc driver, which worked like a > charm. I tried to move away the await from the lazy val. > > Can't wait for your opinion. Thank you so much in advance. > > Andrea > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Async-Functions-and-Scala-async-client-for-mySql-MariaDB-database-connection-tp12469.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.