Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Pamod Sylvester
+1 if we can partition contents by the queue name, we could delete it
directly.

On Mon, Jul 6, 2015 at 7:36 AM, Ramith Jayasinghe  wrote:

> I think we can simplify this one. and do a delete without even getting
> message ids.
>
>
> On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester  wrote:
>
>> Hi Hasitha,
>>
>> With reference to the discussion in [1], this was done to prevent
>> Tombstone overwhelming exception when Cassandra was used as the message
>> store.
>>
>> AFAIR The process of purging was as follows,
>>
>> 1) From MessageMetaData CF, retrieve the corresponding message ids
>> 2) Delete the corresponding messages from MessageMetaData and then delete
>> them from the MessageContent.
>>
>> When transacting > 1,000,000 messages and disconnection of the
>> subscription will go through the above mentioned process.
>>
>> The issue is,
>>
>> a) Message content is not partitioned based on the queue name as Message
>> Meta data. so we cannot delete the content row wise, which restricted us to
>> retrieve ids from Message Meta data and remove them from content.
>> b) When retrieving message meta data (let's say we used a range query to
>> select all the ids) there will be tombstones that will be selected with
>> that query that will hit the limit, causing the Tombstone overwhelming
>> exception.
>>
>> As a solution, when  removeMessagesOfDestinationForNode() is called we
>> get the current fresh slot, and the idea of getting the start id of the
>> slot is that it assures that the message ids that should be selected from
>> the message meta data should be greater than the value defined in the start
>> id (since messages with ids < start id are the messages that have being
>> delivered already that has caused tombstones).
>>
>> Also, I believe we could change the flow now since we discontinue
>> Cassandra. Since in RDBMS you could select all ids without any issue.
>>
>> Hope this explains your query.
>>
>> [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
>> Triggered When Subscription Disconnection/Deletion
>>
>> Thanks,
>> Pamod
>>
>> On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya 
>> wrote:
>>
>>> Hi,
>>>
>>> Identified an issue regarding message purging when last subscriber for a
>>> topic is disconnected from broker cluster.
>>>
>>> @org.wso2.andes.kernel.OrphanedMessageHandler
>>>
>>>
>>> private void removeMessagesOfDestinationForNode(String destination,
>>> String ownerName,
>>> boolean isTopic) throws AndesException {
>>>
>>> try {
>>>
>>> Long startMessageID = null;
>>> Long endMessageID = null;
>>>
>>> //Will first retrieve the last unassigned slot id
>>> String nodeID =
>>> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>>> //Will get the storage queue name
>>> *String storageQueue =
>>> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
>>> *//We get the relevant slot from the coordinator if running
>>> on cluster mode*
>>> *Slot unassignedSlot =
>>> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
>>> *//We need to get the starting message ID to inform the DB
>>> to start slicing the message from there*
>>> *//This step would be done in order to ensure that
>>> tombstones will not be fetched during the querying*
>>> *//operation*
>>> *startMessageID = unassignedSlot.getStartMessageId();*
>>> endMessageID = unassignedSlot.getEndMessageId();
>>>
>>> // This is a class used by AndesSubscriptionManager. Andes
>>> Subscription Manager is behind Disruptor layer.
>>> // Hence the call should be made to MessagingEngine NOT
>>> Andes.
>>> // Calling Andes methods from here will lead to probable
>>> deadlocks if Futures are used.
>>> // NOTE: purge call should be made to MessagingEngine not
>>> Andes
>>> if (0 < endMessageID) {
>>> //If the slot id is 0, which means for the given storage
>>> queue there're no unassigned slots which means
>>> //we don't need to purge messages in this case
>>> //The purpose of purge operation is to make sure that
>>> unassigned slots will be removed if no subs exists
>>> MessagingEngine.getInstance().purgeMessages(destination,
>>> ownerName, isTopic, startMessageID);
>>> }
>>> } catch (ConnectionException e) {
>>> String mesage = "Error while establishing a connection with
>>> the thrift server";
>>> log.error(mesage);
>>> throw new AndesException(mesage, e);
>>> }
>>>
>>> }
>>>
>>> Why we need a start message id here ?
>>> What about purging the whole internal queue (related to topic) ?
>>>
>>> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
>>> isTopic, 0);*
>>>

Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Ramith Jayasinghe
I think we can simplify this one. and do a delete without even getting
message ids.


On Mon, Jul 6, 2015 at 5:05 PM, Pamod Sylvester  wrote:

> Hi Hasitha,
>
> With reference to the discussion in [1], this was done to prevent
> Tombstone overwhelming exception when Cassandra was used as the message
> store.
>
> AFAIR The process of purging was as follows,
>
> 1) From MessageMetaData CF, retrieve the corresponding message ids
> 2) Delete the corresponding messages from MessageMetaData and then delete
> them from the MessageContent.
>
> When transacting > 1,000,000 messages and disconnection of the
> subscription will go through the above mentioned process.
>
> The issue is,
>
> a) Message content is not partitioned based on the queue name as Message
> Meta data. so we cannot delete the content row wise, which restricted us to
> retrieve ids from Message Meta data and remove them from content.
> b) When retrieving message meta data (let's say we used a range query to
> select all the ids) there will be tombstones that will be selected with
> that query that will hit the limit, causing the Tombstone overwhelming
> exception.
>
> As a solution, when  removeMessagesOfDestinationForNode() is called we
> get the current fresh slot, and the idea of getting the start id of the
> slot is that it assures that the message ids that should be selected from
> the message meta data should be greater than the value defined in the start
> id (since messages with ids < start id are the messages that have being
> delivered already that has caused tombstones).
>
> Also, I believe we could change the flow now since we discontinue
> Cassandra. Since in RDBMS you could select all ids without any issue.
>
> Hope this explains your query.
>
> [1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
> Triggered When Subscription Disconnection/Deletion
>
> Thanks,
> Pamod
>
> On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya  wrote:
>
>> Hi,
>>
>> Identified an issue regarding message purging when last subscriber for a
>> topic is disconnected from broker cluster.
>>
>> @org.wso2.andes.kernel.OrphanedMessageHandler
>>
>>
>> private void removeMessagesOfDestinationForNode(String destination,
>> String ownerName,
>> boolean isTopic) throws AndesException {
>>
>> try {
>>
>> Long startMessageID = null;
>> Long endMessageID = null;
>>
>> //Will first retrieve the last unassigned slot id
>> String nodeID =
>> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
>> //Will get the storage queue name
>> *String storageQueue =
>> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
>> *//We get the relevant slot from the coordinator if running
>> on cluster mode*
>> *Slot unassignedSlot =
>> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
>> *//We need to get the starting message ID to inform the DB to
>> start slicing the message from there*
>> *//This step would be done in order to ensure that tombstones
>> will not be fetched during the querying*
>> *//operation*
>> *startMessageID = unassignedSlot.getStartMessageId();*
>> endMessageID = unassignedSlot.getEndMessageId();
>>
>> // This is a class used by AndesSubscriptionManager. Andes
>> Subscription Manager is behind Disruptor layer.
>> // Hence the call should be made to MessagingEngine NOT Andes.
>> // Calling Andes methods from here will lead to probable
>> deadlocks if Futures are used.
>> // NOTE: purge call should be made to MessagingEngine not
>> Andes
>> if (0 < endMessageID) {
>> //If the slot id is 0, which means for the given storage
>> queue there're no unassigned slots which means
>> //we don't need to purge messages in this case
>> //The purpose of purge operation is to make sure that
>> unassigned slots will be removed if no subs exists
>> MessagingEngine.getInstance().purgeMessages(destination,
>> ownerName, isTopic, startMessageID);
>> }
>> } catch (ConnectionException e) {
>> String mesage = "Error while establishing a connection with
>> the thrift server";
>> log.error(mesage);
>> throw new AndesException(mesage, e);
>> }
>>
>> }
>>
>> Why we need a start message id here ?
>> What about purging the whole internal queue (related to topic) ?
>>
>> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
>> isTopic, 0);*
>>
>> Thanks
>> --
>> *Hasitha Abeykoon*
>> Senior Software Engineer; WSO2, Inc.; http://wso2.com
>> *cell:* *+94 719363063*
>> *blog: **abeykoon.blogspot.com* 
>>
>>
>
>
> --
> *Pamod Sylvester *
>
> *WSO2 Inc.; http://wso2.com 

Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Pamod Sylvester
Hi Hasitha,

With reference to the discussion in [1], this was done to prevent Tombstone
overwhelming exception when Cassandra was used as the message store.

AFAIR The process of purging was as follows,

1) From MessageMetaData CF, retrieve the corresponding message ids
2) Delete the corresponding messages from MessageMetaData and then delete
them from the MessageContent.

When transacting > 1,000,000 messages and disconnection of the subscription
will go through the above mentioned process.

The issue is,

a) Message content is not partitioned based on the queue name as Message
Meta data. so we cannot delete the content row wise, which restricted us to
retrieve ids from Message Meta data and remove them from content.
b) When retrieving message meta data (let's say we used a range query to
select all the ids) there will be tombstones that will be selected with
that query that will hit the limit, causing the Tombstone overwhelming
exception.

As a solution, when  removeMessagesOfDestinationForNode() is called we get
the current fresh slot, and the idea of getting the start id of the slot is
that it assures that the message ids that should be selected from the
message meta data should be greater than the value defined in the start id
(since messages with ids < start id are the messages that have being
delivered already that has caused tombstones).

Also, I believe we could change the flow now since we discontinue
Cassandra. Since in RDBMS you could select all ids without any issue.

Hope this explains your query.

[1] [Dev][MB] TombstoneOverwhelmingException When Purge Operation is
Triggered When Subscription Disconnection/Deletion

Thanks,
Pamod

On Mon, Jul 6, 2015 at 7:00 AM, Hasitha Hiranya  wrote:

> Hi,
>
> Identified an issue regarding message purging when last subscriber for a
> topic is disconnected from broker cluster.
>
> @org.wso2.andes.kernel.OrphanedMessageHandler
>
>
> private void removeMessagesOfDestinationForNode(String destination,
> String ownerName,
> boolean isTopic) throws AndesException {
>
> try {
>
> Long startMessageID = null;
> Long endMessageID = null;
>
> //Will first retrieve the last unassigned slot id
> String nodeID =
> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
> //Will get the storage queue name
> *String storageQueue =
> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
> *//We get the relevant slot from the coordinator if running on
> cluster mode*
> *Slot unassignedSlot =
> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
> *//We need to get the starting message ID to inform the DB to
> start slicing the message from there*
> *//This step would be done in order to ensure that tombstones
> will not be fetched during the querying*
> *//operation*
> *startMessageID = unassignedSlot.getStartMessageId();*
> endMessageID = unassignedSlot.getEndMessageId();
>
> // This is a class used by AndesSubscriptionManager. Andes
> Subscription Manager is behind Disruptor layer.
> // Hence the call should be made to MessagingEngine NOT Andes.
> // Calling Andes methods from here will lead to probable
> deadlocks if Futures are used.
> // NOTE: purge call should be made to MessagingEngine not Andes
> if (0 < endMessageID) {
> //If the slot id is 0, which means for the given storage
> queue there're no unassigned slots which means
> //we don't need to purge messages in this case
> //The purpose of purge operation is to make sure that
> unassigned slots will be removed if no subs exists
> MessagingEngine.getInstance().purgeMessages(destination,
> ownerName, isTopic, startMessageID);
> }
> } catch (ConnectionException e) {
> String mesage = "Error while establishing a connection with
> the thrift server";
> log.error(mesage);
> throw new AndesException(mesage, e);
> }
>
> }
>
> Why we need a start message id here ?
> What about purging the whole internal queue (related to topic) ?
>
> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
> isTopic, 0);*
>
> Thanks
> --
> *Hasitha Abeykoon*
> Senior Software Engineer; WSO2, Inc.; http://wso2.com
> *cell:* *+94 719363063*
> *blog: **abeykoon.blogspot.com* 
>
>


-- 
*Pamod Sylvester *

*WSO2 Inc.; http://wso2.com *
cell: +94 77 7779495
___
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev


Re: [Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Hasitha Hiranya
@Pamod,

Any idea on this?

Thanks

On Mon, Jul 6, 2015 at 4:30 PM, Hasitha Hiranya  wrote:

> Hi,
>
> Identified an issue regarding message purging when last subscriber for a
> topic is disconnected from broker cluster.
>
> @org.wso2.andes.kernel.OrphanedMessageHandler
>
>
> private void removeMessagesOfDestinationForNode(String destination,
> String ownerName,
> boolean isTopic) throws AndesException {
>
> try {
>
> Long startMessageID = null;
> Long endMessageID = null;
>
> //Will first retrieve the last unassigned slot id
> String nodeID =
> ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
> //Will get the storage queue name
> *String storageQueue =
> AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
> *//We get the relevant slot from the coordinator if running on
> cluster mode*
> *Slot unassignedSlot =
> MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
> *//We need to get the starting message ID to inform the DB to
> start slicing the message from there*
> *//This step would be done in order to ensure that tombstones
> will not be fetched during the querying*
> *//operation*
> *startMessageID = unassignedSlot.getStartMessageId();*
> endMessageID = unassignedSlot.getEndMessageId();
>
> // This is a class used by AndesSubscriptionManager. Andes
> Subscription Manager is behind Disruptor layer.
> // Hence the call should be made to MessagingEngine NOT Andes.
> // Calling Andes methods from here will lead to probable
> deadlocks if Futures are used.
> // NOTE: purge call should be made to MessagingEngine not Andes
> if (0 < endMessageID) {
> //If the slot id is 0, which means for the given storage
> queue there're no unassigned slots which means
> //we don't need to purge messages in this case
> //The purpose of purge operation is to make sure that
> unassigned slots will be removed if no subs exists
> MessagingEngine.getInstance().purgeMessages(destination,
> ownerName, isTopic, startMessageID);
> }
> } catch (ConnectionException e) {
> String mesage = "Error while establishing a connection with
> the thrift server";
> log.error(mesage);
> throw new AndesException(mesage, e);
> }
>
> }
>
> Why we need a start message id here ?
> What about purging the whole internal queue (related to topic) ?
>
> *MessagingEngine.getInstance().purgeMessages(destination, ownerName,
> isTopic, 0);*
>
> Thanks
> --
> *Hasitha Abeykoon*
> Senior Software Engineer; WSO2, Inc.; http://wso2.com
> *cell:* *+94 719363063*
> *blog: **abeykoon.blogspot.com* 
>
>


-- 
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* 
___
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev


[Dev] Issue in removing messages of non durable topics

2015-07-06 Thread Hasitha Hiranya
Hi,

Identified an issue regarding message purging when last subscriber for a
topic is disconnected from broker cluster.

@org.wso2.andes.kernel.OrphanedMessageHandler


private void removeMessagesOfDestinationForNode(String destination,
String ownerName,
boolean isTopic) throws AndesException {

try {

Long startMessageID = null;
Long endMessageID = null;

//Will first retrieve the last unassigned slot id
String nodeID =
ClusterResourceHolder.getInstance().getClusterManager().getMyNodeID();
//Will get the storage queue name
*String storageQueue =
AndesUtils.getStorageQueueForDestination(destination, nodeID, isTopic);*
*//We get the relevant slot from the coordinator if running on
cluster mode*
*Slot unassignedSlot =
MessagingEngine.getInstance().getSlotCoordinator().getSlot(storageQueue);*
*//We need to get the starting message ID to inform the DB to
start slicing the message from there*
*//This step would be done in order to ensure that tombstones
will not be fetched during the querying*
*//operation*
*startMessageID = unassignedSlot.getStartMessageId();*
endMessageID = unassignedSlot.getEndMessageId();

// This is a class used by AndesSubscriptionManager. Andes
Subscription Manager is behind Disruptor layer.
// Hence the call should be made to MessagingEngine NOT Andes.
// Calling Andes methods from here will lead to probable
deadlocks if Futures are used.
// NOTE: purge call should be made to MessagingEngine not Andes
if (0 < endMessageID) {
//If the slot id is 0, which means for the given storage
queue there're no unassigned slots which means
//we don't need to purge messages in this case
//The purpose of purge operation is to make sure that
unassigned slots will be removed if no subs exists
MessagingEngine.getInstance().purgeMessages(destination,
ownerName, isTopic, startMessageID);
}
} catch (ConnectionException e) {
String mesage = "Error while establishing a connection with the
thrift server";
log.error(mesage);
throw new AndesException(mesage, e);
}

}

Why we need a start message id here ?
What about purging the whole internal queue (related to topic) ?

*MessagingEngine.getInstance().purgeMessages(destination, ownerName,
isTopic, 0);*

Thanks
-- 
*Hasitha Abeykoon*
Senior Software Engineer; WSO2, Inc.; http://wso2.com
*cell:* *+94 719363063*
*blog: **abeykoon.blogspot.com* 
___
Dev mailing list
Dev@wso2.org
http://wso2.org/cgi-bin/mailman/listinfo/dev