Hi Ray, Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started.
Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode. I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir https://github.com/apache/bahir-flink I will share update soon. Regards, Saikat On Sun, Jul 15, 2018 at 10:07 PM, Ray <[email protected]> wrote: > Hello Saikat, > > I tried your newest code and wrote a simple word count application to test > the sink. > It appears there's still problems. > Here's my code. > > > > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment > import org.apache.flink.streaming.api.scala.extensions._ > import org.apache.flink.configuration.Configuration > import org.apache.ignite.Ignition > import org.apache.ignite.configuration.CacheConfiguration > > import scala.collection.JavaConverters._ > > > object WordCount { > > def main(args: Array[String]) { > > val ignite = Ignition.start("ignite.xml") > val cacheConfig = new CacheConfiguration[Any, Any]() > ignite.destroyCache("aaa") > cacheConfig.setName("aaa") > cacheConfig.setSqlSchema("PUBLIC") > ignite.createCache(cacheConfig) > ignite.close() > > > // set up the execution environment > val env = StreamExecutionEnvironment. > getExecutionEnvironment > > val igniteSink = new IgniteSink[java.util.Map[String, > Int]]("aaa", > "ignite.xml") > > igniteSink.setAllowOverwrite(false) > igniteSink.setAutoFlushFrequency(1) > > igniteSink.open(new Configuration) > > > // get input data > val text = env.fromElements( > "To be, or not to be,--that is the question:--", > "Whether 'tis nobler in the mind to suffer", > "The slings and arrows of outrageous fortune", > "Or to take arms against a sea of troubles,") > > > val counts = text > // split up the lines in pairs (2-tuples) > containing: (word,1) > .flatMap(_.toLowerCase.split("\\W+")) > .filter(_.nonEmpty) > .map((_, 1)) > // group by the tuple field "0" and sum up tuple > field "1" > .keyBy(0) > .sum(1) > // Convert to key/value format before ingesting to > Ignite > .mapWith { case (k: String, v: Int) => Map(k -> > v).asJava } > .addSink(igniteSink) > > try > env.execute("Streaming WordCount1") > catch { > case e: Exception => > > // Exception handling. > } finally igniteSink.close() > > } > } > > I tried running this application in Idea and the error log snippet is as > follows > > 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched > to > FAILED > class org.apache.ignite.IgniteException: Default Ignite instance has > already > been started. > at > org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils. > java:990) > at org.apache.ignite.Ignition.start(Ignition.java:355) > at IgniteSink.open(IgniteSink.java:135) > at > org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open( > AbstractUdfStreamOperator.java:111) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:376) > at > org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) > at java.lang.Thread.run(Thread.java:745) > Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite > instance has already been started. > at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx. > java:1134) > at > org.apache.ignite.internal.IgnitionEx.startConfigurations( > IgnitionEx.java:1069) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:955) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:854) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:724) > at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx. > java:693) > at org.apache.ignite.Ignition.start(Ignition.java:352) > ... 7 more > > 07/16/2018 11:05:30 Job execution switched to status FAILING. > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
