pdxcodemonkey commented on a change in pull request #920: URL: https://github.com/apache/geode-native/pull/920#discussion_r803908544
########## File path: cppcache/integration/test/FunctionExecutionTest.cpp ########## @@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector { } virtual void addResult(const std::shared_ptr<Cacheable> &result) override { + LOGINFO("Before mutex lock!"); + std::lock_guard<decltype(mutex_)> lock{mutex_}; + LOGINFO("Adding a new result!"); Review comment: Do we really wish to log this at INFO level? Looks more like a DEBUG-level message to me... ########## File path: cppcache/src/ExecutionImpl.hpp ########## @@ -100,29 +114,18 @@ class ExecutionImpl { m_allServer(allServer), m_pool(pool), m_authenticatedView(authenticatedView) {} + protected: + std::shared_ptr<CacheableVector> m_routingObj; Review comment: You've essentially rewritten the class already, it would be a shame not to go ahead and properly rename the member variables. ########## File path: cppcache/src/TcrMessage.cpp ########## @@ -1184,12 +1184,11 @@ void TcrMessage::handleByteArrayResponse( input.readInt32(); input.advanceCursor(1); // ignore byte - if (!m_functionAttributes) { - m_functionAttributes = std::make_shared<std::vector<int8_t>>(); - } - m_functionAttributes->push_back(input.read()); - m_functionAttributes->push_back(input.read()); - m_functionAttributes->push_back(input.read()); + bool hasResult = input.read() != 0; + bool isHA = input.read() != 0; + bool optimizeForWrite = input.read() != 0; + m_functionAttributes = + FunctionAttributes{isHA, hasResult, optimizeForWrite}; Review comment: Just pausing for a minute to think about how hateful it was to have this encoded as a vector<int8_t> with no indication of meaning whatsoever. This is a huge improvement in readability, thanks! ########## File path: cppcache/integration/test/FunctionExecutionTest.cpp ########## @@ -74,6 +99,9 @@ class TestResultCollector : public ResultCollector { } virtual void addResult(const std::shared_ptr<Cacheable> &result) override { + LOGINFO("Before mutex lock!"); + std::lock_guard<decltype(mutex_)> lock{mutex_}; + LOGINFO("Adding a new result!"); Review comment: Actually no, we shouldn't be using any of the LOG* macros in integration test code at all. Please remove. ########## File path: cppcache/src/FunctionAttributes.hpp ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#ifndef GEODE_FUNCTIONATTRIBUTES_H_ +#define GEODE_FUNCTIONATTRIBUTES_H_ + +#include <cstdint> + +namespace apache { +namespace geode { +namespace client { + +class FunctionAttributes { + public: + enum : uint8_t { Review comment: Does this enum need to be public? You have accessors for the individual values already, it seems like a thing you'd want to hide. ########## File path: cppcache/src/ExecutionImpl.cpp ########## @@ -434,31 +384,15 @@ void ExecutionImpl::executeOnAllServers(const std::string& func, } } std::shared_ptr<CacheableVector> ExecutionImpl::executeOnPool( - const std::string& func, uint8_t getResult, int32_t retryAttempts, - std::chrono::milliseconds timeout) { + const std::string& func, FunctionAttributes funcAttrs, + int32_t retryAttempts, std::chrono::milliseconds timeout) { ThinClientPoolDM* tcrdm = dynamic_cast<ThinClientPoolDM*>(m_pool.get()); if (tcrdm == nullptr) { throw IllegalArgumentException( "Execute: pool cast to ThinClientPoolDM failed"); } int32_t attempt = 0; - // auto csArray = tcrdm->getServers(); - - // if (csArray != nullptr && csArray->length() != 0) { - // for (int i = 0; i < csArray->length(); i++) - // { - // auto cs = csArray[i]; - // TcrEndpoint *ep = nullptr; - // /* - // std::string endpointStr = - // Utils::convertHostToCanonicalForm(cs->value().c_str() - // ); - // */ - // ep = tcrdm->addEP(cs->value().c_str()); - // } - //} - // if pools retry attempts are not set then retry once on all available Review comment: Thanks for deleting commented code! ########## File path: cppcache/src/TcrEndpoint.cpp ########## @@ -754,9 +754,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request, // TcrMessage * req = const_cast<TcrMessage *>(&request); LOGDEBUG("TcrEndpoint::sendRequestConn = %p", m_baseDM); if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn); - if (((type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION) && - (request.hasResult() & 2))) { + if ((type == TcrMessage::EXECUTE_FUNCTION || + type == TcrMessage::EXECUTE_REGION_FUNCTION) && + request.hasResult()) { Review comment: lolwut? I poked around briefly to attempt to understand the '& 2' that was here previously, and haven't found a clue. Was this some magic flag to indicate HA or something? ########## File path: cppcache/src/FunctionExecution.cpp ########## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FunctionExecution.hpp" + +#include <geode/ResultCollector.hpp> + +#include "CacheImpl.hpp" +#include "TcrConnectionManager.hpp" +#include "TcrMessage.hpp" +#include "ThinClientPoolDM.hpp" +#include "ThinClientRegion.hpp" + +namespace apache { +namespace geode { +namespace client { + +void FunctionExecution::setParameters( + const std::string& funcName, FunctionAttributes funcAttrs, + std::chrono::milliseconds timeout, std::shared_ptr<Cacheable> args, + TcrEndpoint* ep, ThinClientPoolDM* poolDM, + std::shared_ptr<std::recursive_mutex> resultCollectorMutex, + std::shared_ptr<ResultCollector> resultCollector, + std::shared_ptr<UserAttributes> userAttr) { + exceptionMsg_.clear(); + resultCollectorMutex_ = std::move(resultCollectorMutex); + resultCollector_ = resultCollector; + error_ = GF_NOTCON; + funcName_ = funcName; + funcAttrs_ = funcAttrs; + timeout_ = timeout; + args_ = args; + endpoint_ = ep; + pool_ = poolDM; + userAttrs_ = userAttr; +} + +GfErrType FunctionExecution::execute() { + GuardUserAttributes gua; + + if (userAttrs_) { + gua.setAuthenticatedView(userAttrs_->getAuthenticatedView()); + } + + TcrMessageExecuteFunction request( + new DataOutput( + pool_->getConnectionManager().getCacheImpl()->createDataOutput()), + funcName_, args_, funcAttrs_, pool_, timeout_); + TcrMessageReply reply(true, pool_); + + auto resultProcessor = std::unique_ptr<ChunkedFunctionExecutionResponse>( + new ChunkedFunctionExecutionResponse(reply, funcAttrs_.hasResult(), + resultCollector_, + resultCollectorMutex_)); + + reply.setChunkedResultHandler(resultProcessor.get()); + reply.setTimeout(timeout_); + reply.setDM(pool_); + + LOGDEBUG( + "ThinClientPoolDM::sendRequestToAllServer sendRequest on endpoint[%s]!", + endpoint_->name().c_str()); + + error_ = pool_->sendRequestToEP(request, reply, endpoint_); + error_ = pool_->handleEPError(endpoint_, reply, error_); + if (error_ != GF_NOERR) { + if (error_ == GF_NOTCON) { + return GF_NOERR; // if server is unavailable its not an error for + // functionexec OnServers() case + } + LOGDEBUG("FunctionExecution::execute failed on endpoint[%s]!. Error = %d ", + endpoint_->name().c_str(), error_); + if (reply.getMessageType() == TcrMessage::EXCEPTION) { + exceptionMsg_ = reply.getException(); + } + + return error_; + } else if (reply.getMessageType() == TcrMessage::EXCEPTION || + reply.getMessageType() == TcrMessage::EXECUTE_FUNCTION_ERROR) { + error_ = ThinClientRegion::handleServerException("Execute", + reply.getException()); + exceptionMsg_ = reply.getException(); + } + + return error_; +} + +OnRegionFunctionExecution::OnRegionFunctionExecution( + std::string funcName, const Region* region, std::shared_ptr<Cacheable> args, + std::shared_ptr<CacheableHashSet> routingObj, FunctionAttributes funcAttrs, + std::chrono::milliseconds timeout, ThinClientPoolDM* poolDM, + const std::shared_ptr<std::recursive_mutex>& collectorMutex, + std::shared_ptr<ResultCollector> collector, + std::shared_ptr<UserAttributes> userAttrs, bool isBGThread, + const std::shared_ptr<BucketServerLocation>& serverLocation, + bool allBuckets) + : serverLocation_{serverLocation}, + backgroundThread_{isBGThread}, + pool_{poolDM}, + funcName_{funcName}, + funcAttrs_{funcAttrs}, + timeout_{timeout}, + args_{args}, + routingObj_{routingObj}, + resultCollector_{std::move(collector)}, + resultCollectorMutex_{collectorMutex}, + userAttrs_{userAttrs}, + region_{region}, + allBuckets_{allBuckets} { + request_ = new TcrMessageExecuteRegionFunctionSingleHop( Review comment: Eek! new/delete? How about std::unique_ptr instead? ########## File path: cppcache/src/TcrEndpoint.cpp ########## @@ -754,9 +754,9 @@ GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request, // TcrMessage * req = const_cast<TcrMessage *>(&request); LOGDEBUG("TcrEndpoint::sendRequestConn = %p", m_baseDM); if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn); - if (((type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION) && - (request.hasResult() & 2))) { + if ((type == TcrMessage::EXECUTE_FUNCTION || + type == TcrMessage::EXECUTE_REGION_FUNCTION) && + request.hasResult()) { Review comment: Okay, I see the type changed from int8_t to bool, that explains why the test against 2 had to go away. Is this some byte in a protocol message, where 2 is a bitflag indicating a result? IIRC, the chunk flag in chunked messages does something very similar to indicate "this is the last chunk". Such a strange thing... ########## File path: cppcache/src/ThinClientRegion.cpp ########## @@ -2852,22 +2853,25 @@ void ThinClientRegion::executeFunction( if (reExecuteForServ) { msg = new TcrMessageExecuteRegionFunction( new DataOutput(m_cacheImpl->createDataOutput()), func, this, args, - routingObj, getResult, failedNodes, timeout, m_tcrdm.get(), + routingObj, funcAttrs, failedNodes, timeout, m_tcrdm.get(), static_cast<int8_t>(1)); } else { msg = new TcrMessageExecuteRegionFunction( new DataOutput(m_cacheImpl->createDataOutput()), func, this, args, - routingObj, getResult, failedNodes, timeout, m_tcrdm.get(), + routingObj, funcAttrs, failedNodes, timeout, m_tcrdm.get(), static_cast<int8_t>(0)); } TcrMessageReply reply(true, m_tcrdm.get()); // need to check ChunkedFunctionExecutionResponse* resultCollector( - new ChunkedFunctionExecutionResponse(reply, (getResult & 2) == 2, rc)); + new ChunkedFunctionExecutionResponse(reply, funcAttrs.hasResult(), rc)); reply.setChunkedResultHandler(resultCollector); reply.setTimeout(timeout); GfErrType err = GF_NOERR; - err = m_tcrdm->sendSyncRequest(*msg, reply, !(getResult & 1)); + + // Function failover logic is not handled in the network layer. That's why + // attemptFailover should be always be false when calling sendSyncRequest + err = m_tcrdm->sendSyncRequest(*msg, reply, false); Review comment: If the comment here is true, why do we still need the attemptFailover parameter? ########## File path: cppcache/src/TcrMessage.cpp ########## @@ -1184,12 +1184,11 @@ void TcrMessage::handleByteArrayResponse( input.readInt32(); input.advanceCursor(1); // ignore byte - if (!m_functionAttributes) { - m_functionAttributes = std::make_shared<std::vector<int8_t>>(); - } - m_functionAttributes->push_back(input.read()); - m_functionAttributes->push_back(input.read()); - m_functionAttributes->push_back(input.read()); + bool hasResult = input.read() != 0; + bool isHA = input.read() != 0; + bool optimizeForWrite = input.read() != 0; + m_functionAttributes = + FunctionAttributes{isHA, hasResult, optimizeForWrite}; Review comment: hasResult is now set to whether or not the byte read was 0, which is not the same as "((byte)) & 2 != 0". Is this the correct test? ########## File path: cppcache/integration/test/FunctionExecutionTest.cpp ########## @@ -276,31 +318,164 @@ TEST(FunctionExecutionTest, FunctionExecutionWithIncompleteBucketLocations) { cache.close(); } -std::shared_ptr<CacheableVector> populateRegionReturnFilter( - const std::shared_ptr<Region> ®ion, const int numberOfPuts) { - auto routingObj = CacheableVector::create(); - for (int i = 0; i < numberOfPuts; i++) { - region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i)); - routingObj->push_back(CacheableKey::create("KEY--" + std::to_string(i))); - } - return routingObj; +TEST(FunctionExecutionTest, shNonHAFunctionExecServerTimeout) { + Cluster cluster{LocatorCount{1}, ServerCount{3}}; + + cluster.start([&]() { + cluster.getGfsh() + .deploy() + .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath)) + .execute(); + }); + + cluster.getGfsh() + .create() + .region() + .withName("partition_region") + .withType("PARTITION") + .withRedundantCopies("1") + .execute(); + + auto cache = CacheFactory().set("log-level", "none").create(); + auto poolFactory = cache.getPoolManager().createFactory(); + + cluster.applyLocators(poolFactory); + + auto pool = + poolFactory.setPRSingleHopEnabled(true).setRetryAttempts(2).create( + "pool"); + + auto region = cache.createRegionFactory(RegionShortcut::PROXY) + .setPoolName("pool") + .create("partition_region"); + + // Populate region in a way that not all buckets are created. + // Servers in this case will create 88 of possible 113 buckets. + populateRegionAllBuckets(region); + + // Check that PR metadata is updated. This is done to be sure + // that client will execute function in a non single hop manner + // because metadata doesn't contain all bucket locations. + // After metadata is refreshed, it will contain at least one + // bucket location. + CacheImpl *cacheImpl = CacheRegionHelper::getCacheImpl(&cache); + waitUntilPRMetadataIsRefreshed(cacheImpl); + + auto functionService = FunctionService::onRegion(region); + auto resultCollector = std::make_shared<TestResultCollector>(); + + resultCollector->lock(); + + auto runner = + std::async(std::launch::async, [&functionService, resultCollector]() { + functionService.withCollector(resultCollector) + .execute("MultiGetAllFunctionTimeoutNonHA"); + }); + + std::this_thread::sleep_for(std::chrono::seconds{25}); Review comment: Please wait on a synchronization object, rather than introduce sleep_for calls into test code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@geode.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org