RE: Apache Flink Sink + Ignite: Ouch! Argument is invalid
Hi guys, Thanks for helping with the fix! As this is a development topic now and not a usage one, I’m BCC’ing the user-list and replacing it with dev-list. Please continue the discussion there. Andrey, Dmitry, please help with the review. Thanks, Stan From: Saikat Maitra Sent: 22 июля 2018 г. 8:28 To: u...@ignite.apache.org; ray...@cisco.com Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid Hi Ray, Andrew As discussed I have fixed the issue with IgniteSink when running in cluster mode. Please review the below PR and share feedback. PR : https://github.com/apache/ignite/pull/4398 Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695 Regards, Saikat On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra wrote: Hi Ray, Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started. Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode. I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir https://github.com/apache/bahir-flink I will share update soon. Regards, Saikat On Sun, Jul 15, 2018 at 10:07 PM, Ray wrote: Hello Saikat, I tried your newest code and wrote a simple word count application to test the sink. It appears there's still problems. Here's my code. import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.configuration.Configuration import org.apache.ignite.Ignition import org.apache.ignite.configuration.CacheConfiguration import scala.collection.JavaConverters._ object WordCount { def main(args: Array[String]) { val ignite = Ignition.start("ignite.xml") val cacheConfig = new CacheConfiguration[Any, Any]() ignite.destroyCache("aaa") cacheConfig.setName("aaa") cacheConfig.setSqlSchema("PUBLIC") ignite.createCache(cacheConfig) ignite.close() // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa", "ignite.xml") igniteSink.setAllowOverwrite(false) igniteSink.setAutoFlushFrequency(1) igniteSink.open(new Configuration) // get input data val text = env.fromElements( "To be, or not to be,--that is the question:--", "Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune", "Or to take arms against a sea of troubles,") val counts = text // split up the lines in pairs (2-tuples) containing: (word,1) .flatMap(_.toLowerCase.split("\\W+")) .filter(_.nonEmpty) .map((_, 1)) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .sum(1) // Convert to key/value format before ingesting to Ignite .mapWith { case (k: String, v: Int) => Map(k -> v).asJava } .addSink(igniteSink) try env.execute("Streaming WordCount1") catch { case e: Exception => // Exception handling. } finally igniteSink.close() } } I tried running this application in Idea and the error log snippet is as follows 07/16/2018 11:05:30 aggregation -> Map -> Sink: Unnamed(4/8) switched to FAILED class org.apache.ignite.IgniteException: Default Ignite instance has already been started. at org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990) at org.apache.ignite.Ignition.start(Ignition.java:355) at IgniteSink.open(IgniteSink.java:135) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at
[GitHub] ignite pull request #4401: IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in th...
GitHub user DmitriyGovorukhin opened a pull request: https://github.com/apache/ignite/pull/4401 IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in the end of file You can merge this pull request into a Git repository by running: $ git pull https://github.com/gridgain/apache-ignite ignite-9049 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/ignite/pull/4401.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4401 commit cdf86420885ddbf60bd6e4d452c913ef722a6d89 Author: Dmitriy Govorukhin Date: 2018-07-22T20:48:23Z IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in the end of file ---
[GitHub] ignite pull request #4400: IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in th...
Github user DmitriyGovorukhin closed the pull request at: https://github.com/apache/ignite/pull/4400 ---
[GitHub] ignite pull request #4400: IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in th...
GitHub user DmitriyGovorukhin opened a pull request: https://github.com/apache/ignite/pull/4400 IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in the end of file You can merge this pull request into a Git repository by running: $ git pull https://github.com/gridgain/apache-ignite ignite-9049 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/ignite/pull/4400.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4400 commit 752cb61a90586540d9bbca5caf97c7021aa8813d Author: Dmitriy Govorukhin Date: 2018-07-22T20:44:03Z IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in the end of file ---
[GitHub] ignite pull request #4399: Ignite 8820 fix
GitHub user ivandasch opened a pull request: https://github.com/apache/ignite/pull/4399 Ignite 8820 fix You can merge this pull request into a Git repository by running: $ git pull https://github.com/gridgain/apache-ignite ignite-8820-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/ignite/pull/4399.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4399 commit b793c8c591ec79428a7dd85d5603a58d34707e65 Author: Ivan Daschinskiy Date: 2018-07-19T10:47:53Z IGNITE-8990 My merge commit 6026605e5691c458e54155478202afefe6bb9a01 Author: Ivan Daschinskiy Date: 2018-07-22T12:48:06Z Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite commit ef796e79522389916b4d2b3eb28e31eb35f9e6fe Author: Ivan Daschinskiy Date: 2018-07-22T12:54:37Z IGNITE-8820 Fix rollback logic when tx is initiated from client. ---
[jira] [Created] (IGNITE-9049) Missed SWITCH_SEGMENT_RECORD at the end of WAL file but space enough
Dmitriy Govorukhin created IGNITE-9049: -- Summary: Missed SWITCH_SEGMENT_RECORD at the end of WAL file but space enough Key: IGNITE-9049 URL: https://issues.apache.org/jira/browse/IGNITE-9049 Project: Ignite Issue Type: Improvement Reporter: Dmitriy Govorukhin There is a situation the several threads try addRecord when the free space ends (need rollOver to the next WAL segment) and none thread writes SWITCH_SEGMENT_RECORD. This leads to an end of the file will have garbage. If we try to iterate over this segment, iterator stopped when try to read next record and stumble on the garbage at the end of the file, it leads log will not be fully read. Any type of operation required iterator may be broken (crash recovery, delta rebalance, etc.). Example: File size 1024 bytes Current tail position 768 (free space 256) 1. Thread-1 try addRecord (size 128) -> tail update to 896. 2. Thread-2 try addRecord (size 128) -> tail update to 1024 (free space ended). None thread still not write any data, it just reserves position for write. (SegmentedRingByteBuffer.offer). 3. Thread-3 try addRecord (size 128) -> no space enough -> rollOver and CAS stop flag to TRUE. 4. Thread-1 and Thread-2 try to write data and cannot do it. FileWriteHandle.addRecord {code} if (buf == null || (stop.get() && rec.type() != SWITCH_SEGMENT_RECORD)) return null; // Can not write to this segment, need to switch to the next one. {code} Thread-3 - can not write SWITCH_SEGMENT_RECORD because of not enough space. Thread-1 and Thread-2 cannot write their data because a stop is TRUE We have garbage from 768 to 1024 position. -- This message was sent by Atlassian JIRA (v7.6.3#76005)