/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define __STDC_FORMAT_MACROS #include #include #include #include #include #include #include #include #include #include #include using namespace apache::thrift::test; using apache::thrift::loadgen::ScoreBoard; namespace apache { namespace thrift { typedef std::shared_ptr LoadTestClientPtr; const int kTimeout = 60000; const int MAX_LOOPS = 0; class OpData { public: OpData() : opType_(0), a(0), b(0), code(0) {} uint32_t opType_; int64_t a; int64_t b; int32_t code; }; class LoadCallback; class LoopTerminator { public: explicit LoopTerminator(folly::EventBase* base) : ops_(0), base_(base) {} void incr() { ops_++; } void decr() { ops_--; if (ops_ == 0) { base_->terminateLoopSoon(); } } private: uint32_t ops_; folly::EventBase* base_; }; /* This class drives the async load for a single connection */ class AsyncRunner2 { public: friend class LoadCallback; AsyncRunner2( const std::shared_ptr& config, LoopTerminator& terminator, const std::shared_ptr& scoreboard, const LoadTestClientPtr& client, int n_ops, int n_async, AsyncIntervalTimer* atimer) : terminator_(terminator), config_(config), scoreboard_(scoreboard), client_(client), ctr_(n_ops), n_outstanding_(0), n_async_(n_async), asyncTimer_(atimer) {} ~AsyncRunner2() {} void startRun() { for (int i = 0; i < n_async_ * 2; i++) { terminator_.incr(); performAsyncOperation(); } } LoopTerminator& terminator_; private: void genericCob( LoadTestAsyncClient* client, ClientReceiveState&& rstate, OpData* opData); void performAsyncOperation(); const std::shared_ptr& config_; const std::shared_ptr& scoreboard_; std::shared_ptr client_; int ctr_; int n_outstanding_; int n_async_; AsyncIntervalTimer* asyncTimer_; }; class LoadCallback : public RequestCallback { public: LoadCallback(AsyncRunner2* r, LoadTestAsyncClient* client) : r_(r), client_(client), oneway_(false) {} void setOneway() { oneway_ = true; } void requestSent() override { if (oneway_) { r_->genericCob(client_, ClientReceiveState(), &data_); } } void replyReceived(ClientReceiveState&& rstate) override { r_->genericCob(client_, std::move(rstate), &data_); } void requestError(ClientReceiveState&&) override { r_->terminator_.decr(); r_->scoreboard_->opFailed(data_.opType_); } OpData data_; private: AsyncRunner2* r_; LoadTestAsyncClient* client_; bool oneway_; }; LoadTestClientPtr AsyncClientWorker2::createConnection() { const std::shared_ptr& config = getConfig(); folly::AsyncSocket::UniquePtr socket; if (config->useSSL()) { auto sslSocket = folly::AsyncSSLSocket::newSocket(sslContext_, &eb_); if (session_) { sslSocket->setSSLSession(session_); } sslSocket->connect(nullptr, *config->getAddress(), kTimeout); // Loop until connection is established and TLS handshake completes. // Unlike a regular AsyncSocket which is usable even before TCP handshke // completes, an SSL socket reports !good() until TLS handshake completes. eb_.loop(); auto session = sslSocket->getSSLSession(); if (config->useTickets() && session) { session_ = session; } socket = std::move(sslSocket); } else { socket = folly::AsyncSocket::newSocket(&eb_, *config->getAddress(), kTimeout); } HeaderClientChannel::Options options; if (!config->useHeaderProtocol()) { options.setClientType(THRIFT_FRAMED_DEPRECATED); } auto channel = HeaderClientChannel::newChannel(std::move(socket), std::move(options)); channel->setTimeout(kTimeout); // For testing equality, make sure to use binary if (config->zlib()) { apache::thrift::CompressionConfig compressionConfig; compressionConfig.codecConfig_ref().ensure().set_zlibConfig(); channel->setDesiredCompressionConfig(compressionConfig); } return std::make_shared(std::move(channel)); } void AsyncClientWorker2::run() { int loopCount = 0; int maxLoops = MAX_LOOPS; std::list clients; std::list::iterator it; LoopTerminator loopTerminator(&eb_); do { // Create a new connection int n_clients = getConfig()->getAsyncClients(); // Determine how many operations to perform on this connection uint32_t nops = getConfig()->pickOpsPerConnection(); // How many outstanding ops at a time int n_async = getConfig()->getAsyncOpsPerClient(); asyncTimer_.setRatePerSec( getConfig()->getDesiredQPS(), getConfig()->getNumWorkerThreads()); for (int i = 0; i < n_clients; i++) { std::shared_ptr client; try { client = createConnection(); AsyncRunner2* r = new AsyncRunner2( getConfig(), loopTerminator, getScoreBoard(), client, nops, n_async, &asyncTimer_); clients.push_back(r); } catch (const std::exception& ex) { ErrorAction action = handleConnError(ex); if (action == EA_CONTINUE || action == EA_NEXT_CONNECTION) { // continue the next connection loop continue; } else if (action == EA_DROP_THREAD) { T_ERROR("worker %d exiting after connection error", getID()); stopWorker(); return; } else if (action == EA_ABORT) { T_ERROR("worker %d causing abort after connection error", getID()); abort(); } else { T_ERROR( "worker %d received unknown conn error action %d; aborting", getID(), action); abort(); } } } asyncTimer_.start(); for (auto r : clients) { r->startRun(); } eb_.loopForever(); for (auto r : clients) { delete r; } clients.clear(); } while (maxLoops == 0 || ++loopCount < MAX_LOOPS); stopWorker(); } void AsyncClientWorker2::setupSSLContext() { if (getConfig()->cert().empty() ^ getConfig()->key().empty()) { throw std::runtime_error("Must supply both key and cert or none"); } // set client certs if specified. if (!getConfig()->cert().empty()) { sslContext_->loadCertificate(getConfig()->cert().c_str(), "PEM"); } if (!getConfig()->key().empty()) { sslContext_->loadPrivateKey(getConfig()->key().c_str(), "PEM"); } } void AsyncRunner2::performAsyncOperation() { LoadCallback* cb = new LoadCallback(this, client_.get()); std::unique_ptr recvCob(cb); if (!asyncTimer_->sleep()) { T_ERROR("can't keep up with requested QPS rate"); } uint32_t opType = config_->pickOpType(); scoreboard_->opStarted(opType); OpData* d = &cb->data_; d->opType_ = opType; n_outstanding_++; ctr_--; switch (static_cast( opType)) { case apache::thrift::test::ClientLoadConfig::OP_NOOP: return client_->noop(std::move(recvCob)); case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_NOOP: cb->setOneway(); return client_->onewayNoop(std::move(recvCob)); case apache::thrift::test::ClientLoadConfig::OP_ASYNC_NOOP: return client_->asyncNoop(std::move(recvCob)); case apache::thrift::test::ClientLoadConfig::OP_SLEEP: return client_->sleep(std::move(recvCob), config_->pickSleepUsec()); case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_SLEEP: cb->setOneway(); return client_->onewaySleep(std::move(recvCob), config_->pickSleepUsec()); case apache::thrift::test::ClientLoadConfig::OP_BURN: return client_->burn(std::move(recvCob), config_->pickBurnUsec()); case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_BURN: cb->setOneway(); return client_->onewayBurn(std::move(recvCob), config_->pickBurnUsec()); case apache::thrift::test::ClientLoadConfig::OP_BAD_SLEEP: return client_->badSleep(std::move(recvCob), config_->pickSleepUsec()); case apache::thrift::test::ClientLoadConfig::OP_BAD_BURN: return client_->badBurn(std::move(recvCob), config_->pickBurnUsec()); case apache::thrift::test::ClientLoadConfig::OP_THROW_ERROR: d->code = loadgen::RNG::getU32(); return client_->throwError(std::move(recvCob), d->code); case apache::thrift::test::ClientLoadConfig::OP_THROW_UNEXPECTED: return client_->throwUnexpected( std::move(recvCob), loadgen::RNG::getU32()); case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_THROW: cb->setOneway(); return client_->onewayThrow(std::move(recvCob), loadgen::RNG::getU32()); case apache::thrift::test::ClientLoadConfig::OP_SEND: { std::string str(config_->pickSendSize(), 'a'); return client_->send(std::move(recvCob), str); } case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_SEND: { std::string str(config_->pickSendSize(), 'a'); cb->setOneway(); return client_->onewaySend(std::move(recvCob), str); } case apache::thrift::test::ClientLoadConfig::OP_RECV: return client_->recv(std::move(recvCob), config_->pickRecvSize()); case apache::thrift::test::ClientLoadConfig::OP_SENDRECV: { std::string str(config_->pickSendSize(), 'a'); return client_->sendrecv( std::move(recvCob), str, config_->pickRecvSize()); } case apache::thrift::test::ClientLoadConfig::OP_ECHO: { std::string str(config_->pickSendSize(), 'a'); return client_->echo(std::move(recvCob), str); } case apache::thrift::test::ClientLoadConfig::OP_ADD: { boost::uniform_int distribution; d->a = distribution(loadgen::RNG::getRNG()); d->b = distribution(loadgen::RNG::getRNG()); return client_->add(std::move(recvCob), d->a, d->b); } case apache::thrift::test::ClientLoadConfig::OP_LARGE_CONTAINER: { std::vector items; config_->makeBigContainer(items); return client_->largeContainer(std::move(recvCob), items); } case apache::thrift::test::ClientLoadConfig::OP_ITER_ALL_FIELDS: { std::vector items; config_->makeBigContainer(items); return client_->iterAllFields(std::move(recvCob), items); } case apache::thrift::test::ClientLoadConfig::NUM_OPS: // fall through break; // no default case, so gcc will warn us if a new op is added // and this switch statement is not updated } T_ERROR( "AsyncClientWorker2::performOperation() unknown operation %" PRIu32, opType); assert(false); } void AsyncRunner2::genericCob( LoadTestAsyncClient* client, ClientReceiveState&& rstate, OpData* opData) { int64_t int64_result; std::string string_result; std::vector container_result; n_outstanding_--; try { switch (static_cast( opData->opType_)) { case apache::thrift::test::ClientLoadConfig::OP_NOOP: client->recv_noop(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_NOOP: break; case apache::thrift::test::ClientLoadConfig::OP_ASYNC_NOOP: client->recv_asyncNoop(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_SLEEP: client->recv_sleep(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_SLEEP: break; case apache::thrift::test::ClientLoadConfig::OP_BURN: client->recv_burn(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_BURN: break; case apache::thrift::test::ClientLoadConfig::OP_BAD_SLEEP: client->recv_badSleep(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_BAD_BURN: client->recv_badBurn(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_THROW_ERROR: try { client->recv_throwError(rstate); T_ERROR("throwError() didn't throw any exception"); } catch (const LoadError& error) { DCHECK_EQ(*error.code_ref(), opData->code); } break; case apache::thrift::test::ClientLoadConfig::OP_THROW_UNEXPECTED: try { client->recv_throwUnexpected(rstate); } catch (const TApplicationException&) { // expected; do nothing } break; case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_THROW: break; case apache::thrift::test::ClientLoadConfig::OP_SEND: client->recv_send(rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ONEWAY_SEND: break; case apache::thrift::test::ClientLoadConfig::OP_RECV: client->recv_recv(string_result, rstate); break; case apache::thrift::test::ClientLoadConfig::OP_SENDRECV: client->recv_sendrecv(string_result, rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ECHO: client->recv_echo(string_result, rstate); break; case apache::thrift::test::ClientLoadConfig::OP_ADD: int64_result = client->recv_add(rstate); if (int64_result != opData->a + opData->b) { T_ERROR( "add(%" PRId64 ", %" PRId64 " gave wrong result %" PRId64 "(expected %" PRId64 ")", opData->a, opData->b, int64_result, opData->a + opData->b); } break; case apache::thrift::test::ClientLoadConfig::OP_LARGE_CONTAINER: { client->recv_largeContainer(rstate); break; } case apache::thrift::test::ClientLoadConfig::OP_ITER_ALL_FIELDS: { client->recv_iterAllFields(container_result, rstate); break; } case apache::thrift::test::ClientLoadConfig::NUM_OPS: break; // no default case, so gcc will warn us if a new op is added // and this switch statement is not updated } } catch (const std::exception& ex) { terminator_.decr(); T_ERROR("Unexpected exception: %s", ex.what()); scoreboard_->opFailed(opData->opType_); // don't launch anymore return; } scoreboard_->opSucceeded(opData->opType_); if (ctr_ > 0) { // launch more ops, attempt to keep between 1x and 2x n_async_ ops // outstanding if (n_outstanding_ <= n_async_) { for (int i = 0; i < n_async_; i++) { terminator_.incr(); performAsyncOperation(); } } } terminator_.decr(); } } // namespace thrift } // namespace apache