RE: Apache Flink Sink + Ignite: Ouch! Argument is invalid

2018-07-22 Thread Stanislav Lukyanov
Hi guys,

Thanks for helping with the fix!

As this is a development topic now and not a usage one, I’m BCC’ing the 
user-list and replacing it with dev-list.
Please continue the discussion there.

Andrey, Dmitry, please help with the review.

Thanks,
Stan

From: Saikat Maitra
Sent: 22 июля 2018 г. 8:28
To: u...@ignite.apache.org; ray...@cisco.com
Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster 
mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra  wrote:
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am 
checking the IgniteSink it is working as desired. In stand alone mode I can see 
we are getting the exception class org.apache.ignite.IgniteException: Default 
Ignite instance has already been started.

Please take a look into this sample application 
https://github.com/samaitra/streamers which I used to run it with flink in 
cluster mode.

I am considering if I should make changes to run the IgniteSink in client mode 
similar to the ways flink connector for redis and flume were implemented in 
Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray  wrote:
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

        def main(args: Array[String]) {

                val ignite = Ignition.start("ignite.xml")
                val cacheConfig = new CacheConfiguration[Any, Any]()
                ignite.destroyCache("aaa")
                cacheConfig.setName("aaa")
                cacheConfig.setSqlSchema("PUBLIC")
                ignite.createCache(cacheConfig)
                ignite.close()


                // set up the execution environment
                val env = StreamExecutionEnvironment.getExecutionEnvironment

                val igniteSink = new IgniteSink[java.util.Map[String, 
Int]]("aaa",
"ignite.xml")

                igniteSink.setAllowOverwrite(false)
                igniteSink.setAutoFlushFrequency(1)

                igniteSink.open(new Configuration)


                // get input data
                val text = env.fromElements(
                        "To be, or not to be,--that is the question:--",
                        "Whether 'tis nobler in the mind to suffer",
                        "The slings and arrows of outrageous fortune",
                        "Or to take arms against a sea of troubles,")


                val counts = text
                        // split up the lines in pairs (2-tuples) containing: 
(word,1)
                        .flatMap(_.toLowerCase.split("\\W+"))
                        .filter(_.nonEmpty)
                        .map((_, 1))
                        // group by the tuple field "0" and sum up tuple field 
"1"
                        .keyBy(0)
                        .sum(1)
                        // Convert to key/value format before ingesting to 
Ignite
                        .mapWith { case (k: String, v: Int) => Map(k -> 
v).asJava }
                        .addSink(igniteSink)

                try
                        env.execute("Streaming WordCount1")
                catch {
                        case e: Exception =>

                        // Exception handling.
                } finally igniteSink.close()

        }
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED 
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
        at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
        at org.apache.ignite.Ignition.start(Ignition.java:355)
        at IgniteSink.open(IgniteSink.java:135)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at 

[GitHub] ignite pull request #4401: IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in th...

2018-07-22 Thread DmitriyGovorukhin
GitHub user DmitriyGovorukhin opened a pull request:

https://github.com/apache/ignite/pull/4401

IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in the end of file



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gridgain/apache-ignite ignite-9049

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/ignite/pull/4401.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4401


commit cdf86420885ddbf60bd6e4d452c913ef722a6d89
Author: Dmitriy Govorukhin 
Date:   2018-07-22T20:48:23Z

IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in the end of file




---


[GitHub] ignite pull request #4400: IGNITE-9049 fix write SWITCH_SEGMENT_RECORD in th...

2018-07-22 Thread DmitriyGovorukhin
Github user DmitriyGovorukhin closed the pull request at:

https://github.com/apache/ignite/pull/4400


---


[GitHub] ignite pull request #4400: IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in th...

2018-07-22 Thread DmitriyGovorukhin
GitHub user DmitriyGovorukhin opened a pull request:

https://github.com/apache/ignite/pull/4400

IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in the end of file



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gridgain/apache-ignite ignite-9049

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/ignite/pull/4400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4400


commit 752cb61a90586540d9bbca5caf97c7021aa8813d
Author: Dmitriy Govorukhin 
Date:   2018-07-22T20:44:03Z

IGNITE-9047 fix write SWITCH_SEGMENT_RECORD in the end of file




---


[GitHub] ignite pull request #4399: Ignite 8820 fix

2018-07-22 Thread ivandasch
GitHub user ivandasch opened a pull request:

https://github.com/apache/ignite/pull/4399

Ignite 8820 fix



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gridgain/apache-ignite ignite-8820-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/ignite/pull/4399.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4399


commit b793c8c591ec79428a7dd85d5603a58d34707e65
Author: Ivan Daschinskiy 
Date:   2018-07-19T10:47:53Z

IGNITE-8990 My merge

commit 6026605e5691c458e54155478202afefe6bb9a01
Author: Ivan Daschinskiy 
Date:   2018-07-22T12:48:06Z

Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite

commit ef796e79522389916b4d2b3eb28e31eb35f9e6fe
Author: Ivan Daschinskiy 
Date:   2018-07-22T12:54:37Z

IGNITE-8820 Fix rollback logic when tx is initiated from client.




---


[jira] [Created] (IGNITE-9049) Missed SWITCH_SEGMENT_RECORD at the end of WAL file but space enough

2018-07-22 Thread Dmitriy Govorukhin (JIRA)
Dmitriy Govorukhin created IGNITE-9049:
--

 Summary: Missed SWITCH_SEGMENT_RECORD at the end of WAL file but 
space enough 
 Key: IGNITE-9049
 URL: https://issues.apache.org/jira/browse/IGNITE-9049
 Project: Ignite
  Issue Type: Improvement
Reporter: Dmitriy Govorukhin


There is a situation the several threads try addRecord when the free space ends 
(need rollOver to the next WAL segment) and none thread writes 
SWITCH_SEGMENT_RECORD. This leads to an end of the file will have garbage. If 
we try to iterate over this segment, iterator stopped when try to read next 
record and stumble on the garbage at the end of the file, it leads log will not 
be fully read. Any type of operation required iterator may be broken (crash 
recovery, delta rebalance, etc.).

Example:
File size 1024 bytes
Current tail position 768 (free space 256)

1. Thread-1 try addRecord (size 128) -> tail update to 896.
2. Thread-2 try addRecord (size 128) -> tail update to 1024 (free space ended).
None thread still not write any data, it just reserves position for write. 
(SegmentedRingByteBuffer.offer).

3. Thread-3 try addRecord  (size 128) -> no space enough -> rollOver and CAS 
stop flag to TRUE.

4. Thread-1 and Thread-2 try to write data and cannot do it.

FileWriteHandle.addRecord
{code}
  if (buf == null || (stop.get() && rec.type() != SWITCH_SEGMENT_RECORD))
return null; // Can not write to this segment, need 
to switch to the next one.

{code}

Thread-3 - can not write SWITCH_SEGMENT_RECORD because of not enough space.
Thread-1 and Thread-2 cannot write their data because a stop is TRUE

We have garbage from 768 to 1024 position.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)