I have a service that will automatically transfer data from one db to
another. I'm using Rhino.ETL for the data transfer. I'm using RSB with RQ
as the mechanism to keep the ETL running. conceptually it works likes this.
1. install the app as a service using RSB Hosting. during install send a
message to run the ETL process.
2. start sevice, which will consume the message to run the ETL Process.
3. consuming the message will
3.1. run the ETL process
3.2. check for additional transfers that may have arrived
3.2.1. If another transfer exists, immediately send a message to
run the ETL process.
3.2.2. if no additional transfers are pending, send a message in
10 minutes to run the ETL process.
as part of the ETL operations I implemented my own
ConventionInput/OutputCommands to not use transactions and instead rely on
the SystemTransaction opened by RhinoQueues.
I also implemented a message module to verify the databases are available.
public class VerifyDatabaseAvailability
: WithLoggingMixin
, IMessageModule
{
private IServiceBus theBus;
public Action TestDatabaseAvailability { get; set; }
public VerifyDatabaseAvailability()
{
TestDatabaseAvailability = () =>
{
using (Use.Connection("source"))
using
(Use.Connection("destination"))
{
}
};
}
public void Init(ITransport transport, IServiceBus bus)
{
theBus = bus;
transport.MessageArrived += CanConnectToDatabase;
}
public void Stop(ITransport transport, IServiceBus bus)
{
transport.MessageArrived -= CanConnectToDatabase;
}
private bool CanConnectToDatabase(CurrentMessageInformation arg)
{
try
{
TestDatabaseAvailability();
return false;
}
catch
{
var oneHourFromNow = SystemTime.Now().AddHours(1);
theBus.DelaySend(oneHourFromNow, arg.AllMessages);
Info("Databases are not available. Will try again on {0:F}",
oneHourFromNow);
return true;
}
}
}
this handles the situations where either database may be offline for
maintenance. I know this is working because I see evidence of this in the
logs when we take the databases offline. However there are some instances
where a TransactionAbortedException or SqlException ('Timeout expired' or
'A transport-level error') is thrown. When this occurs RSB makes 3 attempts
to consume the message, which is the default behavior. The message cannot
be consumed due to the exception and is moved to the error queue. Once in
the error queue the system will stop receiving messages to run the ETL
process. The service is running, but there are not messages being sent.
TransactionAbortedException throw within the message module above
Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport [(null)] - Failed to
process message
System.Transactions.TransactionAbortedException: The transaction has
aborted.
at
System.Transactions.TransactionStatePromotedAborted.CreateAbortingClone(InternalTransaction
tx)
at System.Transactions.DependentTransaction..ctor(IsolationLevel isoLevel,
InternalTransaction internalTransaction, Boolean blocking)
at System.Transactions.Transaction.DependentClone(DependentCloneOption
cloneOption)
at System.Transactions.TransactionScope.SetCurrent(Transaction newCurrent)
at System.Transactions.TransactionScope.PushScope()
at System.Transactions.TransactionScope..ctor(TransactionScopeOption
scopeOption, TransactionOptions transactionOptions)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.SendInternal(Object[]
msgs, Endpoint destination, Action`1 customizeHeaders)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.Send(Endpoint
endpoint, DateTime processAgainAt, Object[] msgs)
at Rhino.ServiceBus.Impl.DefaultServiceBus.DelaySend(DateTime time,
Object[] msgs)
at
MyProject.VerifyDatabaseAvailability.CanConnectToDatabase(CurrentMessageInformation
arg)
at
Rhino.ServiceBus.Transport.TransportUtil.ProcessSingleMessage(CurrentMessageInformation
currentMessageInformation, Func`2 messageRecieved)
at Rhino.ServiceBus.RhinoQueues.RhinoQueuesTransport.ProcessMessage(Message
message, TransactionScope tx, Func`2 messageRecieved, Action`2
messageCompleted, Action`1 beforeTransactionCommit)
And Sql exceptions occur within the ETL process
Rhino.Etl.Core.Pipelines.SingleThreadedPipelineExecuter [(null)] - Failed
to execute operation
System.Data.SqlClient.SqlException (0x80131904): A transport-level error
has occurred when receiving results from the server. (provider: TCP
Provider, error: 0 - The specified network name is no longer available.)
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection)
at System.Data.SqlClient.SqlInternalConnection.OnError(SqlException
exception, Boolean breakConnection)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning()
at
System.Data.SqlClient.TdsParserStateObject.ReadSniError(TdsParserStateObject
stateObj, UInt32 error)
at System.Data.SqlClient.TdsParserStateObject.ReadSni(DbAsyncResult
asyncResult, TdsParserStateObject stateObj)
at System.Data.SqlClient.TdsParserStateObject.ReadNetworkPacket()
at System.Data.SqlClient.TdsParserStateObject.ReadBuffer()
at System.Data.SqlClient.TdsParserStateObject.ReadByte()
at System.Data.SqlClient.TdsParser.Run(RunBehavior runBehavior,
SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet
bulkCopyHandler, TdsParserStateObject stateObj)
at System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String
methodName, Boolean async)
at
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult
result, String methodName, Boolean sendToPipe)
at System.Data.SqlClient.SqlCommand.ExecuteNonQuery()
at MyProject.DatabaseExtensions.SetArithabortOn(IDbConnection connection)
at MyProject.MyOutputOperation.<Execute>d__0.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at Rhino.Etl.Core.Operations.BranchingOperation.<Execute>d__2.MoveNext()
at Rhino.Etl.Core.Enumerables.SingleRowEventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.EventRaisingEnumerator.MoveNext()
at Rhino.Etl.Core.Enumerables.CachingEnumerable`1.MoveNext()
at
Rhino.Etl.Core.Pipelines.AbstractPipelineExecuter.ExecutePipeline(IEnumerable`1
pipeline)
I receive logs that inform me of the errors, I then run a reboot script
which re-installs the service. It's a brute force approach, but it gets the
service up and running.
I would like to add better error handling to account for the
TransactionAbortedException and SqlException and tell RSB to send a message
in 1 hour to try again. Conceptually I want to continue to send messages at
a later time even after the message completely fails.
How can I handle this scenario with RSB? Or should I remove ESB and handle
the continued running differently?
--
You received this message because you are subscribed to the Google Groups
"Rhino Tools Dev" group.
To view this discussion on the web visit
https://groups.google.com/d/msg/rhino-tools-dev/-/J3vaQKSgLGkJ.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/rhino-tools-dev?hl=en.