Chen Luo created ASTERIXDB-1917:
-----------------------------------
Summary: FLUSH_LSN for disk components is not correctly set when a
NC holds multiple partitions
Key: ASTERIXDB-1917
URL: https://issues.apache.org/jira/browse/ASTERIXDB-1917
Project: Apache AsterixDB
Issue Type: Bug
Components: Hyracks, Storage
Reporter: Chen Luo
Assignee: Chen Luo
Attachments: asterix-build-configuration-lsm.xml, sample.zip
When we flush a memory component of an index, we would set an LSN to the result
disk component. The LSN is set as the last operation which modifies that memory
component. Thus, given an index, the FLUSH_LSNs of its flushed disk components
should be increasing, i.e., later flushed components get larger LSNs.
However, currently I observed a bug that later flushed disk components get a
smaller LSN, which breaks this properly.
A brief explanation of this bug is as follows. Suppose we have one dataset D,
and two partitions on one NC. Suppose D only has a primary index P. Further
suppose P1 and P2 are two partitioned indexes for the two partitions on this
NC. This implies P1 and P2 share the same PrimaryIndexOperationTracker, and
they would always be flushed together.
The LSN for flushed disk components is ILSMIOOperationCallback. Now suppose an
index has two memory components. ILSMIOOperationCallback maintains an array
mutableLastLSNs of length 2 to track the FLUSH_LSN for two memory components.
Before scheduling each flush operation, PrimaryIndexOperationTracker needs to
call ILSMIOOperationCallback to set the FLUSH_LSN.
Now consider the following scenario (which happens but very rarely). Initially,
P1.mutableLastLSNs=[0,0]
P2.mutableLastLSNs=[0,0]
Suppose dataset D needs to be flushed, and mutableLastLSNs is set as follows:
P1.mutableLastLSNs=[1, 0]
P2.mutableLastLSNs=[1, 0]
Then, suppose the flush operation of P2 is fast, and produces a disk component
P2.d1 (P2.d1.LSN = 1). Data continues to come into P2, and it needs to be
flushed again. However, P1 is still flushing the first memory component. Then
mutableLastLSNs become:
P1.mutableLastLSNs=[1, 2]
P2.mutableLastLSNs=[1, 2].
Surprisingly, the flush operation of P2 is again fast, and produces a disk
component P2.d2 (P2.d2.LSN = 2). Still, data continues to come into P2, and it
needs to be flushed again. But P1 is still flushing the first memory component.
Then mutableLastLSNs become:
P1.mutableLastLSNs=[3, 2]
P2.mutableLastLSNs=[3, 2].
At this time, P1 finishes its first flush operation, and produced the disk
component P1.d1 (P1.d1.LSN = 3). This in incorrect, since P1.d1.LSN should be
1, not 3. Its original value is overwritten by the flush request of P2!
To reproduce this bug, one needs to change the codebase slightly (i.e.,
LSMBTreeIOOperationCallback). I added a member variable
<code>
private volatile long prevLSN = 0;
</code>
and added one check in the getComponentLSN method:
<code>
@Override
public long getComponentLSN(List<? extends ILSMComponent> diskComponents)
throws HyracksDataException {
if (diskComponents == null) {
// Implies a flush IO operation. --> moves the flush pointer
// Flush operation of an LSM index are executed sequentially.
synchronized (this) {
long lsn = mutableLastLSNs[readIndex];
if (!(prevLSN <= lsn)) {
throw new IllegalStateException();
}
prevLSN = lsn;
return lsn;
}
}
// Get max LSN from the diskComponents. Implies a merge IO operation or
Recovery operation.
long maxLSN = -1L;
for (ILSMComponent c : diskComponents) {
BTree btree = ((LSMBTreeDiskComponent) c).getBTree();
maxLSN =
Math.max(AbstractLSMIOOperationCallback.getTreeIndexLSN(btree), maxLSN);
}
return maxLSN;
}
</code>
Then, after starting AsterixDB using AsterixHyracksIntegrationUtil, you can
ingest the data and reproduce the bug using the following queries (you need to
replace the path_to_sample_data with the attached file):
<code>
drop dataverse twitter if exists;
create dataverse twitter if not exists;
use dataverse twitter
create type typeUser if not exists as open {
id: int64,
name: string,
screen_name : string,
lang : string,
location: string,
create_at: date,
description: string,
followers_count: int32,
friends_count: int32,
statues_count: int64
}
create type typePlace if not exists as open{
country : string,
country_code : string,
full_name : string,
id : string,
name : string,
place_type : string,
bounding_box : rectangle
}
create type typeGeoTag if not exists as open {
stateID: int32,
stateName: string,
countyID: int32,
countyName: string,
cityID: int32?,
cityName: string?
}
create type typeTweet if not exists as open{
create_at : datetime,
id: int64,
"text": string,
in_reply_to_status : int64,
in_reply_to_user : int64,
favorite_count : int64,
coordinate: point?,
retweet_count : int64,
lang : string,
is_retweet: boolean,
hashtags : {{ string }} ?,
user_mentions : {{ int64 }} ? ,
user : typeUser,
place : typePlace?,
geo_tag: typeGeoTag
}
create dataset ds_tweet(typeTweet) if not exists primary key id
using compaction policy correlated-prefix
(("max-mergable-component-size"="134217728"),("max-tolerance-component-count"="5"))
with filter on create_at ;
// with filter on create_at;
//"using" "compaction" "policy" CompactionPolicy ( Configuration )? )?
create feed TweetFeed using localfs
(
("path"="localhost:///path_to_sample_data"),
("address-type"="nc"),
("type-name"="typeTweet"),
("format"="adm")
);
connect feed TweetFeed to dataset ds_tweet;
start feed TweetFeed;
</code>
I attached the asterix-build-configuration-lsm.xml file and the sample data
file as below.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)