[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-06-09 Thread Istvan Toth (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552531#comment-17552531
 ] 

Istvan Toth commented on PHOENIX-6677:
--

Thank you for the detailed explanation, [~kozdemir] .

I've indeed made the mistake of trusing the comments, and not digging deep 
enough into the code.
Checking the call tree of getMutateBatchSize() confirms that you are right.

Does having two sets of properties make any sense (i.e. is there a case when 
the non-batch properties are used for anything ?)

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-06-09 Thread Kadir Ozdemir (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17552339#comment-17552339
 ] 

Kadir Ozdemir commented on PHOENIX-6677:


[~stoty],  Here are the cluster level config params and their defaults: 
{code:java}
public static final String MAX_MUTATION_SIZE_ATTRIB = "phoenix.mutate.maxSize";
public static final String MAX_MUTATION_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.maxSizeBytes";

public static final int DEFAULT_MAX_MUTATION_SIZE = 50;
public static final int DEFAULT_MAX_MUTATION_SIZE_BYTES =  104857600; // 100 Mb

public static final String MUTATE_BATCH_SIZE_ATTRIB = 
"phoenix.mutate.batchSize";
public static final String MUTATE_BATCH_SIZE_BYTES_ATTRIB = 
"phoenix.mutate.batchSizeBytes";

public final static int DEFAULT_MUTATE_BATCH_SIZE = 100; // Batch size for 
UPSERT SELECT and DELETE
//Batch size in bytes for UPSERT, SELECT and DELETE. By default, 2MB
public final static long DEFAULT_MUTATE_BATCH_SIZE_BYTES = 2097152; {code}
 

The first set is at connection level. The second set is at the statement level. 
As you can see, the statement level parameter default value for mutation batch 
size (the number of rows) is 100. Even if we issue sendBatch and then commit 
say at every 2000 rows, Phoenix breaks them into 100 row batches by default. 

Please see the following methods in JDBCUtil
{code:java}
    public static int getMutateBatchSize(String url, Properties info, 
ReadOnlyProps props) throws SQLException {
        String batchSizeStr = findProperty(url, info, 
PhoenixRuntime.UPSERT_BATCH_SIZE_ATTRIB);
        return (batchSizeStr == null ? 
props.getInt(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE) : 
Integer.parseInt(batchSizeStr));
    }

public static long getMutateBatchSizeBytes(String url, Properties info, 
ReadOnlyProps props) throws SQLException {
        String batchSizeStr = findProperty(url, info, 
PhoenixRuntime.UPSERT_BATCH_SIZE_BYTES_ATTRIB);
        return (batchSizeStr == null ? 
props.getLong(QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES) : 
Long.parseLong(batchSizeStr));
    }
 {code}
You can see that it checks the per connection attributes first and then if it 
is not given then uses cluster level connection params. The above code allows 
to override the cluster level parameters using connection properties.

The following comments in the code are confusing. 
{code:java}
/**
 * Use this connection property to control the number of rows that are
 * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
 * It's only used when autoCommit is true and your source table is
 * different than your target table or your SELECT statement has a
 * GROUP BY clause.
 */
public final static String UPSERT_BATCH_SIZE_ATTRIB = "UpsertBatchSize";

/**
 * Use this connection property to control the number of bytes that are
 * batched together on an UPSERT INTO table1... SELECT ... FROM table2.
 * It's only used when autoCommit is true and your source table is
 * different than your target table or your SELECT statement has a
 * GROUP BY clause. Overrides the value of UpsertBatchSize.
 */
public final static String UPSERT_BATCH_SIZE_BYTES_ATTRIB = 
"UpsertBatchSizeBytes"; {code}
 

I used the following code with auto connection disabled and UPSERT batches 
worked as expected (i.e., Phoenix passed 2000 row batches to HBase)
{code:java}
 Properties props = new Properties();
 props.setProperty(UPSERT_BATCH_SIZE_BYTES_ATTRIB, "200");
 props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, "2000");
 try (Connection conn = DriverManager.getConnection(getUrl(), props)) { {code}
 

It is necessary to do more testing and investigation on this to sort things 
out. I agree with you is that default values (100 and 2097152) are too low and 
should be increased.

 

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their 

[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-06-08 Thread Istvan Toth (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551961#comment-17551961
 ] 

Istvan Toth commented on PHOENIX-6677:
--

I think that there are some incorrect assumptions above.

AFACIT UPSERT_BATCH_SIZE_BATCH_ATTRIB is specifically for {*}upsert select{*}s 
, deletes (and index updates), where the mutations are issued from the 
Coprocessors, not from the client.

For standard upserts MAX_MUTATION_SIZE_ATTRIB is used, which already defaults 
to 500K records and 100MB.

However, the default UPSERT_BATCH_SIZE_% values seem to be low indeed. Should 
we consider increasing those ?

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-06-08 Thread Kadir Ozdemir (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551793#comment-17551793
 ] 

Kadir Ozdemir commented on PHOENIX-6677:


[~larsh] Your suggestion works and covers my use cases too. We can actually 
change the batch size using connection properties for example 

{code:java}
 Properties props = new Properties();
 props.setProperty(UPSERT_BATCH_SIZE_BYTES_ATTRIB, "200");
 props.setProperty(UPSERT_BATCH_SIZE_ATTRIB, "2000");
 try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
{code}

There is no need to do more here.

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
> Fix For: 4.17.0, 5.2.0
>
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-03-30 Thread Kadir OZDEMIR (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515030#comment-17515030
 ] 

Kadir OZDEMIR commented on PHOENIX-6677:


[~larsh], You have a valid point. We do not want to implement unnecessarily the 
same functionality already done in HBase. I will try to find out if there is 
any real reason for this chunking.

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
> Fix For: 4.17.0, 5.2.0
>
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-03-30 Thread Lars Hofhansl (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17515013#comment-17515013
 ] 

Lars Hofhansl commented on PHOENIX-6677:


I never quite understood why Phoenix needs to break up a batch into chunks of 
100. Probably has some historical reasons. Hand the whole thing to HBase and 
let it do that, it will parallelize at least per region.

Of course we can do better, as [~kozdemir] suggests.

 

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
> Fix For: 4.17.0, 5.2.0
>
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (PHOENIX-6677) Parallelism within a batch of mutations

2022-03-29 Thread Geoffrey Jacoby (Jira)


[ 
https://issues.apache.org/jira/browse/PHOENIX-6677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17514132#comment-17514132
 ] 

Geoffrey Jacoby commented on PHOENIX-6677:
--

Great idea. I think it's really important that it be tunable though. I know 
some use cases in my environment execute on clients that are really CPU-bound, 
so I suspect that in some situations this might perform less well due to 
introducing more context switching. With less contention, or in a situation 
such as PQS, this sounds like it would be very helpful, however. 

> Parallelism within a batch of mutations 
> 
>
> Key: PHOENIX-6677
> URL: https://issues.apache.org/jira/browse/PHOENIX-6677
> Project: Phoenix
>  Issue Type: Improvement
>Reporter: Kadir OZDEMIR
>Priority: Major
> Fix For: 4.17.0, 5.2.0
>
>
> Currently, Phoenix client simply passes the batches of row mutations from the 
> application to HBase client without any parallelism or intelligent grouping 
> (except grouping mutations for the same row). 
> Assume that the application creates batches 1 row mutations for a given 
> table. Phoenix client divides these rows based on their arrival order into 
> HBase batches of n (e.g., 100) rows based on the configured batch size, i.e., 
> the number of rows and bytes. Then, Phoenix calls HBase batch API, one batch 
> at a time (i.e., serially).  HBase client further divides a given batch of 
> rows into smaller batches based on their regions. This means that a large 
> batch created by the application is divided into many tiny batches and 
> executed mostly serially. For slated tables, this will result in even smaller 
> batches. 
> We can improve the current implementation greatly if we group the rows of the 
> batch prepared by the application into sub batches based on table region 
> boundaries and then execute these batches in parallel. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)