Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am
checking the IgniteSink it is working as desired. In stand alone mode I can
see we are getting the exception class org.apache.ignite.IgniteException:
Default Ignite instance has already been started.

Please take a look into this sample application
https://github.com/samaitra/streamers which I used to run it with flink in
cluster mode.

I am considering if I should make changes to run the IgniteSink in client
mode similar to the ways flink connector for redis and flume were
implemented in Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray <[email protected]> wrote:

> Hello Saikat,
>
> I tried your newest code and wrote a simple word count application to test
> the sink.
> It appears there's still problems.
> Here's my code.
>
>
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.extensions._
> import org.apache.flink.configuration.Configuration
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.CacheConfiguration
>
> import scala.collection.JavaConverters._
>
>
> object WordCount {
>
>         def main(args: Array[String]) {
>
>                 val ignite = Ignition.start("ignite.xml")
>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>                 ignite.destroyCache("aaa")
>                 cacheConfig.setName("aaa")
>                 cacheConfig.setSqlSchema("PUBLIC")
>                 ignite.createCache(cacheConfig)
>                 ignite.close()
>
>
>                 // set up the execution environment
>                 val env = StreamExecutionEnvironment.
> getExecutionEnvironment
>
>                 val igniteSink = new IgniteSink[java.util.Map[String,
> Int]]("aaa",
> "ignite.xml")
>
>                 igniteSink.setAllowOverwrite(false)
>                 igniteSink.setAutoFlushFrequency(1)
>
>                 igniteSink.open(new Configuration)
>
>
>                 // get input data
>                 val text = env.fromElements(
>                         "To be, or not to be,--that is the question:--",
>                         "Whether 'tis nobler in the mind to suffer",
>                         "The slings and arrows of outrageous fortune",
>                         "Or to take arms against a sea of troubles,")
>
>
>                 val counts = text
>                         // split up the lines in pairs (2-tuples)
> containing: (word,1)
>                         .flatMap(_.toLowerCase.split("\\W+"))
>                         .filter(_.nonEmpty)
>                         .map((_, 1))
>                         // group by the tuple field "0" and sum up tuple
> field "1"
>                         .keyBy(0)
>                         .sum(1)
>                         // Convert to key/value format before ingesting to
> Ignite
>                         .mapWith { case (k: String, v: Int) => Map(k ->
> v).asJava }
>                         .addSink(igniteSink)
>
>                 try
>                         env.execute("Streaming WordCount1")
>                 catch {
>                         case e: Exception =>
>
>                         // Exception handling.
>                 } finally igniteSink.close()
>
>         }
> }
>
> I tried running this application in Idea and the error log snippet is as
> follows
>
> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
> to
> FAILED
> class org.apache.ignite.IgniteException: Default Ignite instance has
> already
> been started.
>         at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.
> java:990)
>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>         at IgniteSink.open(IgniteSink.java:135)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.
> java:1134)
>         at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(
> IgnitionEx.java:1069)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:955)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:854)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:724)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:693)
>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>         ... 7 more
>
> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>

Reply via email to