Author: mriou Date: Thu Jun 14 11:55:30 2007 New Revision: 547363 URL: http://svn.apache.org/viewvc?view=rev&rev=547363 Log: Who spat in the soup?! Adjusted it for a proper sodium concentration, better for our heart.
Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Modified: incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java?view=diff&rev=547363&r1=547362&r2=547363 ============================================================================== --- incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java (original) +++ incubator/ode/trunk/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelRuntimeContextImpl.java Thu Jun 14 11:55:30 2007 @@ -113,7 +113,7 @@ private long _maxReductionTimeMs = 2000000; public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS, - MyRoleMessageExchangeImpl instantiatingMessageExchange) { + MyRoleMessageExchangeImpl instantiatingMessageExchange) { _bpelProcess = bpelProcess; _dao = dao; _iid = dao.getInstanceId(); @@ -121,31 +121,30 @@ _vpu = new JacobVPU(); _vpu.registerExtension(BpelRuntimeContext.class, this); + _soup = new ExecutionQueueImpl(null); + _soup.setReplacementMap(_bpelProcess.getReplacementMap()); + _outstandingRequests = new OutstandingRequestManager(); + _vpu.setContext(_soup); + if (bpelProcess.isInMemory()) { ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao; if (inmem.getSoup() != null) { _soup = (ExecutionQueueImpl) inmem.getSoup(); + _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData(); + _vpu.setContext(_soup); } } else { - byte[] daoState = dao.getExecutionState(); - if (daoState != null) { - ByteArrayInputStream iis = new ByteArrayInputStream(daoState); - try { + byte[] daoState = dao.getExecutionState(); + if (daoState != null) { + ByteArrayInputStream iis = new ByteArrayInputStream(daoState); + try { _soup.read(iis); - } catch (Exception ex) { - throw new RuntimeException(ex); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData(); } } - } - - if (_soup == null) { - _soup = new ExecutionQueueImpl(null); - _soup.setReplacementMap(_bpelProcess.getReplacementMap()); - _outstandingRequests = new OutstandingRequestManager(); - } else { - _outstandingRequests = (OutstandingRequestManager) _soup.getGlobalData(); - } - _vpu.setContext(_soup); if (PROCESS != null) { _vpu.inject(PROCESS); @@ -275,7 +274,7 @@ } public void select(PickResponseChannel pickResponseChannel, Date timeout, boolean createInstance, - Selector[] selectors) throws FaultException { + Selector[] selectors) throws FaultException { if (BpelProcess.__log.isTraceEnabled()) BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new Object[] { "pickResponseChannel", pickResponseChannel, "timeout", timeout, "createInstance", createInstance, @@ -518,7 +517,7 @@ } public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg, - QName fault) throws FaultException { + QName fault) throws FaultException { String mexRef = _outstandingRequests.release(plinkInstnace, opName, mexId); if (mexRef == null) { @@ -685,7 +684,7 @@ } public String invoke(PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage, - InvokeResponseChannel channel) throws FaultException { + InvokeResponseChannel channel) throws FaultException { PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink); // The target (partner endpoint) -- if it has not been explicitly @@ -732,9 +731,9 @@ String partnerSessionId = plinkDAO.getPartnerSessionId(); if ( mySessionId != null ) - mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId); + mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId); if ( partnerSessionId != null ) - mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId); + mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId); if (__log.isDebugEnabled()) __log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId); @@ -750,7 +749,7 @@ .getInitialMyRoleEPR(partnerLink.partnerLink) : null; PartnerRoleMessageExchangeImpl mex = new PartnerRoleMessageExchangeImpl(_bpelProcess._engine, mexDao, partnerLink.partnerLink.partnerRolePortType, operation, partnerEpr, myRoleEndpoint, _bpelProcess - .getPartnerRoleChannel(partnerLink.partnerLink)); + .getPartnerRoleChannel(partnerLink.partnerLink)); BpelProcess p2pProcess = null; Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink); @@ -775,9 +774,9 @@ + partnerSessionId + " - partnerSess " + mySessionId); } if ( partnerSessionId != null ) - myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId); + myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId); if ( mySessionId != null ) - myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId); + myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId); mex.setStatus(MessageExchange.Status.REQUEST); myRoleMex.invoke(odeRequest); @@ -809,19 +808,19 @@ // Check if there is a synchronous response, if so, we need to inject the // message on the response channel. switch (mex.getStatus()) { - case NEW: - throw new AssertionError("Impossible!"); - case ASYNC: - break; - case RESPONSE: - case FAULT: - case FAILURE: - invocationResponse(mex); - break; - default: - __log.error("Partner did not acknowledge message exchange: " + mex); - mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null); - invocationResponse(mex); + case NEW: + throw new AssertionError("Impossible!"); + case ASYNC: + break; + case RESPONSE: + case FAULT: + case FAILURE: + invocationResponse(mex); + break; + default: + __log.error("Partner did not acknowledge message exchange: " + mex); + mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null); + invocationResponse(mex); } return mexDao.getMessageExchangeId(); @@ -838,19 +837,19 @@ if (!ProcessState.isFinished(_dao.getState())) { if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance " + _iid); _soup.setGlobalData(_outstandingRequests); - + if (_bpelProcess.isInMemory()) { // don't serialize in-memory processes ((ProcessInstanceDaoImpl) _dao).setSoup(_soup); } else { - ByteArrayOutputStream bos = new ByteArrayOutputStream(10000); - try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(10000); + try { _soup.write(bos); - bos.close(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - _dao.setExecutionState(bos.toByteArray()); + bos.close(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + _dao.setExecutionState(bos.toByteArray()); } if (ProcessState.canExecute(_dao.getState()) && canReduce) { @@ -1039,7 +1038,7 @@ // received messages for optimization purposes. if (__log.isDebugEnabled()) __log.debug("Couldn't extract property '" + property.toString() - + "' in property pre-extraction: " + e.toString()); + + "' in property pre-extraction: " + e.toString()); } } } @@ -1053,19 +1052,19 @@ if (mexDao != null) { MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess._engine, mexDao); switch (mex.getStatus()) { - case ASYNC: - case RESPONSE: - mex.setStatus(MessageExchange.Status.COMPLETED_OK); - break; - case REQUEST: - if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { + case ASYNC: + case RESPONSE: mex.setStatus(MessageExchange.Status.COMPLETED_OK); break; - } - default: - mex.setFailure(FailureType.OTHER, "No response.", null); - _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex); - mex.release(); + case REQUEST: + if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) { + mex.setStatus(MessageExchange.Status.COMPLETED_OK); + break; + } + default: + mex.setFailure(FailureType.OTHER, "No response.", null); + _bpelProcess._engine._contexts.mexContext.onAsyncReply(mex); + mex.release(); } } } @@ -1124,24 +1123,24 @@ MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus()); switch (status) { - case ASYNC: - case REQUEST: - MessageDAO request = dao.getRequest(); - if (request == null) { - // this also should not happen - String msg = "Engine requested request for message exchange that did not have one: " + mexId; - __log.fatal(msg); - throw new BpelEngineException(msg); - } + case ASYNC: + case REQUEST: + MessageDAO request = dao.getRequest(); + if (request == null) { + // this also should not happen + String msg = "Engine requested request for message exchange that did not have one: " + mexId; + __log.fatal(msg); + throw new BpelEngineException(msg); + } - return request.getData(); + return request.getData(); - default: - // We should not be in any other state when requesting this. - String msg = "Engine requested response while the message exchange " + mexId + " was in the state " - + status; - __log.fatal(msg); - throw new BpelEngineException(msg); + default: + // We should not be in any other state when requesting this. + String msg = "Engine requested response while the message exchange " + mexId + " was in the state " + + status; + __log.fatal(msg); + throw new BpelEngineException(msg); } } @@ -1239,7 +1238,7 @@ } public void registerActivityForRecovery(ActivityRecoveryChannel channel, long activityId, String reason, - Date dateTime, Element details, String[] actions, int retries) { + Date dateTime, Element details, String[] actions, int retries) { if (reason == null) reason = "Unspecified"; if (dateTime == null)