Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster
mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <[email protected]>
wrote:

> Hi Ray,
>
> Thank you for validating the changes, I see that in cluster mode when I am
> checking the IgniteSink it is working as desired. In stand alone mode I can
> see we are getting the exception class org.apache.ignite.IgniteException:
> Default Ignite instance has already been started.
>
> Please take a look into this sample application https://github.
> com/samaitra/streamers which I used to run it with flink in cluster mode.
>
> I am considering if I should make changes to run the IgniteSink in client
> mode similar to the ways flink connector for redis and flume were
> implemented in Apache Bahir
>
> https://github.com/apache/bahir-flink
>
> I will share update soon.
>
> Regards,
> Saikat
>
>
>
> On Sun, Jul 15, 2018 at 10:07 PM, Ray <[email protected]> wrote:
>
>> Hello Saikat,
>>
>> I tried your newest code and wrote a simple word count application to test
>> the sink.
>> It appears there's still problems.
>> Here's my code.
>>
>>
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.streaming.api.scala.extensions._
>> import org.apache.flink.configuration.Configuration
>> import org.apache.ignite.Ignition
>> import org.apache.ignite.configuration.CacheConfiguration
>>
>> import scala.collection.JavaConverters._
>>
>>
>> object WordCount {
>>
>>         def main(args: Array[String]) {
>>
>>                 val ignite = Ignition.start("ignite.xml")
>>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>>                 ignite.destroyCache("aaa")
>>                 cacheConfig.setName("aaa")
>>                 cacheConfig.setSqlSchema("PUBLIC")
>>                 ignite.createCache(cacheConfig)
>>                 ignite.close()
>>
>>
>>                 // set up the execution environment
>>                 val env = StreamExecutionEnvironment.get
>> ExecutionEnvironment
>>
>>                 val igniteSink = new IgniteSink[java.util.Map[String,
>> Int]]("aaa",
>> "ignite.xml")
>>
>>                 igniteSink.setAllowOverwrite(false)
>>                 igniteSink.setAutoFlushFrequency(1)
>>
>>                 igniteSink.open(new Configuration)
>>
>>
>>                 // get input data
>>                 val text = env.fromElements(
>>                         "To be, or not to be,--that is the question:--",
>>                         "Whether 'tis nobler in the mind to suffer",
>>                         "The slings and arrows of outrageous fortune",
>>                         "Or to take arms against a sea of troubles,")
>>
>>
>>                 val counts = text
>>                         // split up the lines in pairs (2-tuples)
>> containing: (word,1)
>>                         .flatMap(_.toLowerCase.split("\\W+"))
>>                         .filter(_.nonEmpty)
>>                         .map((_, 1))
>>                         // group by the tuple field "0" and sum up tuple
>> field "1"
>>                         .keyBy(0)
>>                         .sum(1)
>>                         // Convert to key/value format before ingesting
>> to Ignite
>>                         .mapWith { case (k: String, v: Int) => Map(k ->
>> v).asJava }
>>                         .addSink(igniteSink)
>>
>>                 try
>>                         env.execute("Streaming WordCount1")
>>                 catch {
>>                         case e: Exception =>
>>
>>                         // Exception handling.
>>                 } finally igniteSink.close()
>>
>>         }
>> }
>>
>> I tried running this application in Idea and the error log snippet is as
>> follows
>>
>> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
>> to
>> FAILED
>> class org.apache.ignite.IgniteException: Default Ignite instance has
>> already
>> been started.
>>         at
>> org.apache.ignite.internal.util.IgniteUtils.convertException
>> (IgniteUtils.java:990)
>>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>>         at IgniteSink.open(IgniteSink.java:135)
>>         at
>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>> nFunction(FunctionUtils.java:36)
>>         at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.open(AbstractUdfStreamOperator.java:111)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>> perators(StreamTask.java:376)
>>         at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:253)
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>         at java.lang.Thread.run(Thread.java:745)
>> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
>> instance has already been started.
>>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java
>> :1134)
>>         at
>> org.apache.ignite.internal.IgnitionEx.startConfigurations(Ig
>> nitionEx.java:1069)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 955)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 854)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 724)
>>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:
>> 693)
>>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>>         ... 7 more
>>
>> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>>
>>
>>
>> --
>> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>>
>
>

Reply via email to