Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Hello! I suggest that you check those possibilities out: Does performance increase dramatically if you need it on 10% of data, i.e., ~1 million records? Does something change when you have only one client connected? Note that I was running this example on a single node so it should not be hard to create environment to check that out. Note that it is not recommended to have (thick) clients or especially servers on slow network connectivity. Regards, -- Ilya Kasnacheev пт, 28 сент. 2018 г. в 17:24, Ray : > Actually there's only one row in b. > > SELECT COUNT(*) FROM b where x = '1'; > COUNT(*) 1 > > 1 row selected (0.003 seconds) > > Maybe because the join performance drops dramatically when the data size is > more than 10 million or cluster has a lot of clients connected? > My 6 node cluster has 10 clients connected to it and some of them has slow > network connectivity. > > > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Actually there's only one row in b. SELECT COUNT(*) FROM b where x = '1'; COUNT(*) 1 1 row selected (0.003 seconds) Maybe because the join performance drops dramatically when the data size is more than 10 million or cluster has a lot of clients connected? My 6 node cluster has 10 clients connected to it and some of them has slow network connectivity. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Hello! I have indeed try a use case like yours: 0: jdbc:ignite:thin://127.0.0.1/> create index on b(x,y); No rows affected (9,729 seconds) 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from a; COUNT(*) 1 1 row selected (0,017 seconds) 0: jdbc:ignite:thin://127.0.0.1/> select count(*) from b; COUNT(*) 4194304 1 row selected (0,024 seconds) 0: jdbc:ignite:thin://127.0.0.1/> select a.x,a.y from a join b where a.y = b.y and a.x = b.x; X 1 Y 1 1 row selected (0,005 seconds) 0: jdbc:ignite:thin://127.0.0.1/> explain select a.x,a.y from a join b where a.y = b.y and a.x = b.x; PLAN SELECT __Z0.X AS __C0_0, __Z0.Y AS __C0_1 FROM PUBLIC.A __Z0 /* PUBLIC.A.__SCAN_ */ INNER JOIN PUBLIC.B __Z1 /* PUBLIC."b_x_asc_y_asc_idx": Y = __Z0.Y AND X = __Z0.X */ ON 1=1 WHERE (__Z0.Y = __Z1.Y) AND (__Z0.X = __Z1.X) PLAN SELECT __C0_0 AS X, __C0_1 AS Y FROM PUBLIC.__T0 /* PUBLIC."merge_scan" */ 2 rows selected (0,007 seconds) ^ very fast, compared to 1,598 seconds before index was created My standing idea is that you have very low selectivity on b.x. I.e. if 10 million out of 14 million b rows will have x = 1, then index will not be able to help and will only hurt. Can you execute SELECT COUNT(*) FROM b WHERE x = 1; on your dataset? Regards, -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Here's the detailed information for my join test. 0: jdbc:ignite:thin://sap-datanode6/> select * from a; x 1 y 1 A bearbrick 1 row selected (0.002 seconds) 0: jdbc:ignite:thin://sap-datanode6/> select count(*) from b; COUNT(*) 14337959 1 row selected (0.299 seconds) 0: jdbc:ignite:thin://sap-datanode6/> select x,y from b where _key = '1'; x 1 y 1 1 row selected (0.002 seconds) select a.x,a.y from a join b where a.x = b.x and a.y = b.y; x 1 y 1 1 row selected (6.036 seconds) -- Takes 6 seconds to join a table with one row to 14 million row table using affinity key x explain select a.x,a.y from a join b where a.x = b.x and a.y = b.y; PLAN SELECT A__Z0.x AS __C0_0, A__Z0.y AS __C0_1 FROM PUBLIC.B__Z1 /* PUBLIC.B.__SCAN_ */ INNER JOIN PUBLIC.T A__Z0 /* PUBLIC.AFFINITY_KEY: x = B__Z1.x */ ON 1=1 WHERE (A__Z0.y = B__Z1.y) AND (A__Z0.x = B__Z1.x) PLAN SELECT __C0_0 AS x, __C0_1 AS y FROM PUBLIC.__T0 /* PUBLIC."merge_scan" */ If I create a index on table b on field x and y, it takes 6.8 seconds to finish join. create index on b(x,y); No rows affected (31.316 seconds) 0: jdbc:ignite:thin://sap-datanode6/> select a.x,a.y from a join b where a.y = b.y and a.x = b.x; x 1 y 1 1 row selected (6.865 seconds) 0: jdbc:ignite:thin://sap-datanode6/> explain select a.x,a.y from a join b where a.y = b.y and a.x = b.x; PLAN SELECT A__Z0.x AS __C0_0, A__Z0.y AS __C0_1 FROM PUBLIC.T A__Z0 /* PUBLIC.T.__SCAN_ */ INNER JOIN PUBLIC.B__Z1 /* PUBLIC."b_x_asc_y_asc_idx": y = A__Z0.y AND x = A__Z0.x */ ON 1=1 WHERE (A__Z0.y = B__Z1.y) AND (A__Z0.x = B__Z1.x) PLAN SELECT __C0_0 AS x, __C0_1 AS y FROM PUBLIC.__T0 /* PUBLIC."merge_scan" */ 2 rows selected (0.003 seconds) Here's my configuration http://www.springframework.org/schema/beans; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd;> node1:49500 node2:49500 node3:49500 node4:49500 node5:49500 node6:49500 -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Ray, This sounds suspicious. Please show your configuration and the execution plan for the query. -Val -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Hello! Can you show the index that you are creating here? Regards, -- Ilya Kasnacheev вт, 25 сент. 2018 г. в 8:23, Ray : > Let's say I have two tables I want to join together. > Table a has around 10 millions of rows and it's primary key is x and y. > I have created index on field x and y for table a. > > Table b has one row and it's primary key is x and y. > The primary key for that row in table b has a correspondent row in table a > which has the same primary key. > > When I try to execute this query to join "select a.*,b.* from a inner join > b > where (a.x=b.x) and (a.y = b.y);", ti takes more than 4 seconds to show > only > one record. > I also examined the plan for that sql and confirmed the index I created is > used for this sql. > > Ideally, if we use hash join it should take less than half a second. > > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ >
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Let's say I have two tables I want to join together. Table a has around 10 millions of rows and it's primary key is x and y. I have created index on field x and y for table a. Table b has one row and it's primary key is x and y. The primary key for that row in table b has a correspondent row in table a which has the same primary key. When I try to execute this query to join "select a.*,b.* from a inner join b where (a.x=b.x) and (a.y = b.y);", ti takes more than 4 seconds to show only one record. I also examined the plan for that sql and confirmed the index I created is used for this sql. Ideally, if we use hash join it should take less than half a second. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
If join is indexed and collocated, it still can be pretty fast. Do you have a particular query that is slower with optimization than without? -Val -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Hi Val, thanks for the reply. I'll try again and let you know if I missed something. By "Ignite is not optimized for join", I mean currently Ignite only supports nest loop join which is very inefficient when joining two large table. Please refer to these two tickets for details. https://issues.apache.org/jira/browse/IGNITE-6201 https://issues.apache.org/jira/browse/IGNITE-6202 -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Ray, Per my understanding, pushdown filters are propagated to Ignite either way, it's not related to the "optimization". Optimization affects joins, gropings, aggregations, etc. So, unless I'm missing something, the behavior you're looking for is achieved by setting OPTION_DISABLE_SPARK_SQL_OPTIMIZATION to true. However, can you please clarify what you mean "Ignite is not optimized for join"? -Val -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Hi, I am not sure that it will work but you can try next: SparkSession spark = SparkSession .builder() .appName("SomeAppName") .master("spark://10.0.75.1:7077") .config(OPTION_DISABLE_SPARK_SQL_OPTIMIZATION, "false") //or true .getOrCreate(); JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext()); JavaIgniteRDD igniteRdd1 = igniteContext.fromCache("CACHE1"); //here Ignite sql processor will be used because inside SqlFieldsQuery Dataset ds1 = igniteRdd1.sql("select * from CACHE1"); Dataset ds2 = igniteRdd1.sql("select * from CACHE2"); //here spark sql processor will be used ds1.join(ds2).where(); BR, Andrei -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Is there a way to use Ignite optimization and Spark optimization together when using Spark Dataframe API?
Currently, OPTION_DISABLE_SPARK_SQL_OPTIMIZATION option can only be set on spark session level. It means I can only have Ignite optimization or Spark optimization for one Spark job. Let's say I want to load data into spark memory with pushdown filters using Ignite optimization. For example, I want to load one day's data using this sql "select * from tableA where date = '2018-09-01'". With Ignite optimization, this sql is executed on Ignite and the where clause filter is applied on Ignite. But with Spark optimization, all the data in this table will be loaded into Spark memory and do filter later. Then I want to join filtered tableA with filtered tableB which is also loaded from Ignite. But I want use Spark's join feature to do the join because both filtered tableA with filtered tableB contains millions or rows and Ignite is not optimized for join. How can I do that? -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/