Hi Ray, Andrew As discussed I have fixed the issue with IgniteSink when running in cluster mode.
Please review the below PR and share feedback. PR : https://github.com/apache/ignite/pull/4398 Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695 Regards, Saikat On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <[email protected]> wrote: > Hi Ray, > > Thank you for validating the changes, I see that in cluster mode when I am > checking the IgniteSink it is working as desired. In stand alone mode I can > see we are getting the exception class org.apache.ignite.IgniteException: > Default Ignite instance has already been started. > > Please take a look into this sample application https://github. > com/samaitra/streamers which I used to run it with flink in cluster mode. > > I am considering if I should make changes to run the IgniteSink in client > mode similar to the ways flink connector for redis and flume were > implemented in Apache Bahir > > https://github.com/apache/bahir-flink > > I will share update soon. > > Regards, > Saikat > > > > On Sun, Jul 15, 2018 at 10:07 PM, Ray <[email protected]> wrote: > >> Hello Saikat, >> >> I tried your newest code and wrote a simple word count application to test >> the sink. >> It appears there's still problems. >> Here's my code. >> >> >> >> import org.apache.flink.api.scala._ >> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment >> import org.apache.flink.streaming.api.scala.extensions._ >> import org.apache.flink.configuration.Configuration >> import org.apache.ignite.Ignition >> import org.apache.ignite.configuration.CacheConfiguration >> >> import scala.collection.JavaConverters._ >> >> >> object WordCount { >> >> def main(args: Array[String]) { >> >> val ignite = Ignition.start("ignite.xml") >> val cacheConfig = new CacheConfiguration[Any, Any]() >> ignite.destroyCache("aaa") >> cacheConfig.setName("aaa") >> cacheConfig.setSqlSchema("PUBLIC") >> ignite.createCache(cacheConfig) >> ignite.close() >> >> >> // set up the execution environment >> val env = StreamExecutionEnvironment.get >> ExecutionEnvironment >> >> val igniteSink = new IgniteSink[java.util.Map[String, >> Int]]("aaa", >> "ignite.xml") >> >> igniteSink.setAllowOverwrite(false) >> igniteSink.setAutoFlushFrequency(1) >> >> igniteSink.open(new Configuration) >> >> >> // get input data >> val text = env.fromElements( >> "To be, or not to be,--that is the question:--", >> "Whether 'tis nobler in the mind to suffer", >> "The slings and arrows of outrageous fortune", >> "Or to take arms against a sea of troubles,") >> >> >> val counts = text >> // split up the lines in pairs (2-tuples) >> containing: (word,1) >> .flatMap(_.toLowerCase.split("\\W+")) >> .filter(_.nonEmpty) >> .map((_, 1)) >> // group by the tuple field "0" and sum up tuple >> field "1" >> .keyBy(0) >> .sum(1) >> // Convert to key/value format before ingesting >> to Ignite >> .mapWith { case (k: String, v: Int) => Map(k -> >> v).asJava } >> .addSink(igniteSink) >> >> try >> env.execute("Streaming WordCount1") >> catch { >> case e: Exception => >> >> // Exception handling. >> } finally igniteSink.close() >> >> } >> } >> >> I tried running this application in Idea and the error log snippet is as >> follows >> >> 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched >> to >> FAILED >> class org.apache.ignite.IgniteException: Default Ignite instance has >> already >> been started. >> at >> org.apache.ignite.internal.util.IgniteUtils.convertException >> (IgniteUtils.java:990) >> at org.apache.ignite.Ignition.start(Ignition.java:355) >> at IgniteSink.open(IgniteSink.java:135) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.ope >> nFunction(FunctionUtils.java:36) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >> erator.open(AbstractUdfStreamOperator.java:111) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >> perators(StreamTask.java:376) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:253) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite >> instance has already been started. >> at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java >> :1134) >> at >> org.apache.ignite.internal.IgnitionEx.startConfigurations(Ig >> nitionEx.java:1069) >> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java: >> 955) >> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java: >> 854) >> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java: >> 724) >> at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java: >> 693) >> at org.apache.ignite.Ignition.start(Ignition.java:352) >> ... 7 more >> >> 07/16/2018 11:05:30 Job execution switched to status FAILING. >> >> >> >> -- >> Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >> > >
