This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 58d5727 Support Pulsar proxy from C++/Python client library (#1124) 58d5727 is described below commit 58d572797ebd780fecb1c4f4478fad1d590f14c3 Author: Matteo Merli <mme...@apache.org> AuthorDate: Thu Feb 1 11:03:56 2018 -0800 Support Pulsar proxy from C++/Python client library (#1124) --- pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 10 ++++++--- pulsar-client-cpp/lib/ClientConnection.cc | 25 +++++++++++++++-------- pulsar-client-cpp/lib/ClientConnection.h | 15 +++++++++----- pulsar-client-cpp/lib/ClientImpl.cc | 6 +++++- pulsar-client-cpp/lib/Commands.cc | 10 ++++++++- pulsar-client-cpp/lib/Commands.h | 3 ++- pulsar-client-cpp/lib/ConnectionPool.cc | 19 +++++++++-------- pulsar-client-cpp/lib/ConnectionPool.h | 19 ++++++++++++++++- pulsar-client-cpp/lib/HTTPLookupService.cc | 1 + pulsar-client-cpp/lib/LookupDataResult.h | 15 +++++++++++--- pulsar-client-cpp/lib/LookupService.h | 4 ++++ pulsar-client-cpp/lib/Url.cc | 6 ++++++ pulsar-client-cpp/lib/Url.h | 2 ++ pulsar-client-cpp/lib/lz4/lz4.h | 5 +++++ pulsar-client-cpp/python/test_producer.py | 4 ++-- 15 files changed, 110 insertions(+), 34 deletions(-) diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc index 6303449..e5b8d42 100644 --- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc +++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc @@ -53,7 +53,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::lookupAsync( } std::string lookupName = dn->toString(); LookupDataResultPromisePtr promise = boost::make_shared<LookupDataResultPromise>(); - Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_); + Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, lookupName, false, _1, _2, promise)); return promise->getFuture(); @@ -71,7 +71,7 @@ Future<Result, LookupDataResultPtr> BinaryProtoLookupService::getPartitionMetada return promise->getFuture(); } std::string lookupName = dn->toString(); - Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_); + Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_); future.addListener(boost::bind(&BinaryProtoLookupService::sendPartitionMetadataLookupRequest, this, lookupName, _1, _2, promise)); return promise->getFuture(); @@ -100,8 +100,12 @@ void BinaryProtoLookupService::handleLookup(const std::string& destinationName, if (data->isRedirect()) { LOG_DEBUG("Lookup request is for " << destinationName << " redirected to " << data->getBrokerUrl()); + + const std::string& logicalAddress = data->getBrokerUrl(); + const std::string& physicalAddress = + data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress; Future<Result, ClientConnectionWeakPtr> future = - cnxPool_.getConnectionAsync(data->getBrokerUrl()); + cnxPool_.getConnectionAsync(logicalAddress, physicalAddress); future.addListener(boost::bind(&BinaryProtoLookupService::sendTopicLookupRequest, this, destinationName, data->isAuthoritative(), _1, _2, promise)); } else { diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc index 7e954cd..8bdd7a2 100644 --- a/pulsar-client-cpp/lib/ClientConnection.cc +++ b/pulsar-client-cpp/lib/ClientConnection.cc @@ -104,7 +104,8 @@ static Result getResult(ServerError serverError) { return ResultUnknownError; } -ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServicePtr executor, +ClientConnection::ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress, + ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication) : state_(Pending), @@ -114,8 +115,9 @@ ClientConnection::ClientConnection(const std::string& endpoint, ExecutorServiceP executor_(executor), resolver_(executor->createTcpResolver()), socket_(executor->createSocket()), - address_(endpoint), - cnxString_("[<none> -> " + endpoint + "] "), + logicalAddress_(logicalAddress), + physicalAddress_(physicalAddress), + cnxString_("[<none> -> " + physicalAddress + "] "), error_(boost::system::error_code()), incomingBuffer_(SharedBuffer::allocate(DefaultBufferSize)), incomingCmd_(), @@ -267,7 +269,11 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] "; cnxString_ = cnxStringStream.str(); - LOG_INFO(cnxString_ << "Connected to broker"); + if (logicalAddress_ == physicalAddress_) { + LOG_INFO(cnxString_ << "Connected to broker"); + } else { + LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_); + } state_ = TcpConnected; socket_->set_option(tcp::no_delay(true)); @@ -288,7 +294,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, if (!isTlsAllowInsecureConnection_) { boost::system::error_code err; Url service_url; - if (!Url::parse(address_, service_url)) { + if (!Url::parse(physicalAddress_, service_url)) { LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); close(); return; @@ -315,7 +321,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err, } void ClientConnection::handleHandshake(const boost::system::error_code& err) { - SharedBuffer buffer = Commands::newConnect(authentication_); + bool connectingThroughProxy = logicalAddress_ != physicalAddress_; + SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy); // Send CONNECT command to broker asyncWrite(buffer.const_asio_buffer(), boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(), @@ -343,7 +350,7 @@ void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& void ClientConnection::tcpConnectAsync() { boost::system::error_code err; Url service_url; - if (!Url::parse(address_, service_url)) { + if (!Url::parse(physicalAddress_, service_url)) { LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message()); close(); return; @@ -788,6 +795,8 @@ void ClientConnection::handleIncomingCommand() { lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative()); lookupResultPtr->setRedirect(lookupTopicResponse.response() == CommandLookupTopicResponse::Redirect); + lookupResultPtr->setShouldProxyThroughServiceUrl( + lookupTopicResponse.proxy_through_service_url()); lookupDataPromise->setValue(lookupResultPtr); } @@ -1178,7 +1187,7 @@ void ClientConnection::removeConsumer(int consumerId) { consumers_.erase(consumerId); } -const std::string& ClientConnection::brokerAddress() const { return address_; } +const std::string& ClientConnection::brokerAddress() const { return physicalAddress_; } const std::string& ClientConnection::cnxString() const { return cnxString_; } diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h index 4fb4abd..6b0d884 100644 --- a/pulsar-client-cpp/lib/ClientConnection.h +++ b/pulsar-client-cpp/lib/ClientConnection.h @@ -87,12 +87,14 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> typedef std::vector<ConnectionListener>::iterator ListenerIterator; /* - * endpoint - url of the service, for ex. pulsar://localhost:6650 - * connected - set when tcp connection is established + * logicalAddress - url of the service, for ex. pulsar://localhost:6650 + * physicalAddress - the address to connect to, it could be different from the logical address if proxy + * comes into play connected - set when tcp connection is established * */ - ClientConnection(const std::string& endpoint, ExecutorServicePtr executor, - const ClientConfiguration& clientConfiguration, const AuthenticationPtr& authentication); + ClientConnection(const std::string& logicalAddress, const std::string& physicalAddress, + ExecutorServicePtr executor, const ClientConfiguration& clientConfiguration, + const AuthenticationPtr& authentication); ~ClientConnection(); /* @@ -226,10 +228,13 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> */ SocketPtr socket_; TlsSocketPtr tlsSocket_; + + const std::string logicalAddress_; + /* * stores address of the service, for ex. pulsar://localhost:6650 */ - const std::string address_; + const std::string physicalAddress_; // Represent both endpoint of the tcp connection. eg: [client:1234 -> server:6650] std::string cnxString_; diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index a95cd49..b760135 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -258,7 +258,11 @@ void ClientImpl::handleLookup(Result result, LookupDataResultPtr data, Promise<Result, ClientConnectionWeakPtr> promise) { if (data) { LOG_DEBUG("Getting connection to broker: " << data->getBrokerUrl()); - Future<Result, ClientConnectionWeakPtr> future = pool_.getConnectionAsync(data->getBrokerUrl()); + const std::string& logicalAddress = data->getBrokerUrl(); + const std::string& physicalAddress = + data->shouldProxyThroughServiceUrl() ? serviceUrl_ : logicalAddress; + Future<Result, ClientConnectionWeakPtr> future = + pool_.getConnectionAsync(logicalAddress, physicalAddress); future.addListener(boost::bind(&ClientImpl::handleNewConnection, this, _1, _2, promise)); } else { promise.setFailed(result); diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 59cf2a2..0c3fdbd 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -22,6 +22,7 @@ #include "pulsar/MessageBuilder.h" #include "LogUtils.h" #include "Utils.h" +#include "Url.h" #include "checksum/ChecksumProvider.h" #include <algorithm> #include <boost/thread/mutex.hpp> @@ -160,13 +161,20 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, uint return composite; } -SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication) { +SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress, + bool connectingThroughProxy) { BaseCommand cmd; cmd.set_type(BaseCommand::CONNECT); CommandConnect* connect = cmd.mutable_connect(); connect->set_client_version(_PULSAR_VERSION_); connect->set_auth_method_name(authentication->getAuthMethodName()); connect->set_protocol_version(ProtocolVersion_MAX); + if (connectingThroughProxy) { + Url logicalAddressUrl; + Url::parse(logicalAddress, logicalAddressUrl); + connect->set_proxy_to_broker_url(logicalAddressUrl.hostPort()); + } + AuthenticationDataPtr authDataContent; if (authentication->getAuthData(authDataContent) == ResultOk && authDataContent->hasDataFromCommand()) { connect->set_auth_data(authDataContent->getCommandData()); diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index 1fd9e48..bda48d2 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -64,7 +64,8 @@ class Commands { const static uint16_t magicCrc32c = 0x0e01; const static int checksumSize = 4; - static SharedBuffer newConnect(const AuthenticationPtr& authentication); + static SharedBuffer newConnect(const AuthenticationPtr& authentication, const std::string& logicalAddress, + bool connectingThroughProxy); static SharedBuffer newPartitionMetadataRequest(const std::string& topic, uint64_t requestId); diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc index 6ee0e9d..36ce4a5 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.cc +++ b/pulsar-client-cpp/lib/ConnectionPool.cc @@ -33,36 +33,37 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP poolConnections_(poolConnections), mutex_() {} -Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(const std::string& endpoint) { +Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync( + const std::string& logicalAddress, const std::string& physicalAddress) { boost::unique_lock<boost::mutex> lock(mutex_); if (poolConnections_) { - PoolMap::iterator cnxIt = pool_.find(endpoint); + PoolMap::iterator cnxIt = pool_.find(logicalAddress); if (cnxIt != pool_.end()) { ClientConnectionPtr cnx = cnxIt->second.lock(); if (cnx && !cnx->isClosed()) { // Found a valid or pending connection in the pool - LOG_DEBUG("Got connection from pool for " << endpoint << " use_count: " // + LOG_DEBUG("Got connection from pool for " << logicalAddress << " use_count: " // << (cnx.use_count() - 1) << " @ " << cnx.get()); return cnx->getConnectFuture(); } else { // Deleting stale connection LOG_INFO("Deleting stale connection from pool for " - << endpoint << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get()); - pool_.erase(endpoint); + << logicalAddress << " use_count: " << (cnx.use_count() - 1) << " @ " << cnx.get()); + pool_.erase(logicalAddress); } } } // No valid or pending connection found in the pool, creating a new one - ClientConnectionPtr cnx( - new ClientConnection(endpoint, executorProvider_->get(), clientConfiguration_, authentication_)); + ClientConnectionPtr cnx(new ClientConnection(logicalAddress, physicalAddress, executorProvider_->get(), + clientConfiguration_, authentication_)); - LOG_INFO("Created connection for " << endpoint); + LOG_INFO("Created connection for " << logicalAddress); Future<Result, ClientConnectionWeakPtr> future = cnx->getConnectFuture(); - pool_.insert(std::make_pair(endpoint, cnx)); + pool_.insert(std::make_pair(logicalAddress, cnx)); lock.unlock(); diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h index 8a12890..471454b 100644 --- a/pulsar-client-cpp/lib/ConnectionPool.h +++ b/pulsar-client-cpp/lib/ConnectionPool.h @@ -36,7 +36,24 @@ class ConnectionPool { ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider, const AuthenticationPtr& authentication, bool poolConnections = true); - Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& endpoint); + /** + * Get a connection from the pool. + * <p> + * The connection can either be created or be coming from the pool itself. + * <p> + * When specifying multiple addresses, the logicalAddress is used as a tag for the broker, + * while the physicalAddress is where the connection is actually happening. + * <p> + * These two addresses can be different when the client is forced to connect through + * a proxy layer. Essentially, the pool is using the logical address as a way to + * decide whether to reuse a particular connection. + * + * @param logicalAddress the address to use as the broker tag + * @param physicalAddress the real address where the TCP connection should be made + * @return a future that will produce the ClientCnx object + */ + Future<Result, ClientConnectionWeakPtr> getConnectionAsync(const std::string& logicalAddress, + const std::string& physicalAddress); private: ClientConfiguration clientConfiguration_; diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc index 816a7e9..1a98f60 100644 --- a/pulsar-client-cpp/lib/HTTPLookupService.cc +++ b/pulsar-client-cpp/lib/HTTPLookupService.cc @@ -217,6 +217,7 @@ LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>(); lookupDataResultPtr->setBrokerUrl(brokerUrl); lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl); + LOG_INFO("parseLookupData = " << *lookupDataResultPtr); return lookupDataResultPtr; } diff --git a/pulsar-client-cpp/lib/LookupDataResult.h b/pulsar-client-cpp/lib/LookupDataResult.h index 698a21f..5c1387a 100644 --- a/pulsar-client-cpp/lib/LookupDataResult.h +++ b/pulsar-client-cpp/lib/LookupDataResult.h @@ -32,8 +32,8 @@ class LookupDataResult { public: void setBrokerUrl(const std::string& brokerUrl) { brokerUrl_ = brokerUrl; } void setBrokerUrlSsl(const std::string& brokerUrlSsl) { brokerUrlSsl_ = brokerUrlSsl; } - std::string getBrokerUrl() { return brokerUrl_; } - std::string getBrokerUrlSsl() { return brokerUrlSsl_; } + const std::string& getBrokerUrl() const { return brokerUrl_; } + const std::string& getBrokerUrlSsl() const { return brokerUrlSsl_; } bool isAuthoritative() const { return authoritative; } @@ -47,6 +47,12 @@ class LookupDataResult { void setRedirect(bool redirect) { this->redirect = redirect; } + bool shouldProxyThroughServiceUrl() const { return proxyThroughServiceUrl_; } + + void setShouldProxyThroughServiceUrl(bool proxyThroughServiceUrl) { + proxyThroughServiceUrl_ = proxyThroughServiceUrl; + } + private: friend inline std::ostream& operator<<(std::ostream& os, const LookupDataResult& b); std::string brokerUrl_; @@ -54,12 +60,15 @@ class LookupDataResult { int partitions; bool authoritative; bool redirect; + + bool proxyThroughServiceUrl_; }; std::ostream& operator<<(std::ostream& os, const LookupDataResult& b) { os << "{ LookupDataResult [brokerUrl_ = " << b.brokerUrl_ << "] [brokerUrlSsl_ = " << b.brokerUrlSsl_ << "] [partitions = " << b.partitions << "] [authoritative = " << b.authoritative - << "] [redirect = " << b.redirect << "]"; + << "] [redirect = " << b.redirect << "] proxyThroughServiceUrl = " << b.proxyThroughServiceUrl_ + << "] }"; return os; } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/LookupService.h b/pulsar-client-cpp/lib/LookupService.h index 4e8dbe6..36ba800 100644 --- a/pulsar-client-cpp/lib/LookupService.h +++ b/pulsar-client-cpp/lib/LookupService.h @@ -41,7 +41,11 @@ class LookupService { * Gets Partition metadata */ virtual Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const DestinationNamePtr& dn) = 0; + + virtual ~LookupService() {} }; + typedef boost::shared_ptr<LookupService> LookupServicePtr; + } // namespace pulsar #endif // PULSAR_CPP_LOOKUPSERVICE_H diff --git a/pulsar-client-cpp/lib/Url.cc b/pulsar-client-cpp/lib/Url.cc index af80ea0..0fb22ba 100644 --- a/pulsar-client-cpp/lib/Url.cc +++ b/pulsar-client-cpp/lib/Url.cc @@ -88,6 +88,12 @@ const std::string& Url::file() const { return file_; } const std::string& Url::parameter() const { return parameter_; } +std::string Url::hostPort() const { + std::stringstream ss; + ss << host_ << ':' << port_; + return ss.str(); +} + std::ostream& operator<<(std::ostream& os, const Url& obj) { os << "Url [Host = " << obj.host() << ", Protocol = " << obj.protocol() << ", Port = " << obj.port() << "]"; diff --git a/pulsar-client-cpp/lib/Url.h b/pulsar-client-cpp/lib/Url.h index aaf03dc..2b413cf 100644 --- a/pulsar-client-cpp/lib/Url.h +++ b/pulsar-client-cpp/lib/Url.h @@ -41,6 +41,8 @@ class Url { const std::string& parameter() const; friend std::ostream& operator<<(std::ostream& os, const Url& obj); + std::string hostPort() const; + private: std::string protocol_; std::string host_; diff --git a/pulsar-client-cpp/lib/lz4/lz4.h b/pulsar-client-cpp/lib/lz4/lz4.h index bff8f97..c68232a 100644 --- a/pulsar-client-cpp/lib/lz4/lz4.h +++ b/pulsar-client-cpp/lib/lz4/lz4.h @@ -197,9 +197,11 @@ int LZ4_decompress_safe_partial(const char* source, char* dest, int compressedSi * note : only allocated directly the structure if you are statically linking LZ4 * If you are using liblz4 as a DLL, please use below construction methods instead. */ +// clang-format off typedef struct { long long table[LZ4_STREAMSIZE_U64]; } LZ4_stream_t; +// clang-format on /* * LZ4_resetStream @@ -254,9 +256,12 @@ int LZ4_saveDict(LZ4_stream_t* streamPtr, char* safeBuffer, int dictSize); #define LZ4_STREAMDECODESIZE_U64 4 #define LZ4_STREAMDECODESIZE (LZ4_STREAMDECODESIZE_U64 * sizeof(unsigned long long)) +// clang-format off typedef struct { unsigned long long table[LZ4_STREAMDECODESIZE_U64]; } LZ4_streamDecode_t; +// clang-format on + /* * LZ4_streamDecode_t * information structure to track an LZ4 stream. diff --git a/pulsar-client-cpp/python/test_producer.py b/pulsar-client-cpp/python/test_producer.py index 24a2c58..8e3c655 100755 --- a/pulsar-client-cpp/python/test_producer.py +++ b/pulsar-client-cpp/python/test_producer.py @@ -30,9 +30,9 @@ producer = client.create_producer( batching_max_publish_delay_ms=10 ) -while True: +for i in range(10): try: - producer.send_async('hello', None) + producer.send('hello', None) except Exception as e: print("Failed to send message: %s", e) -- To stop receiving notification emails like this one, please contact mme...@apache.org.