/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include #include #include namespace apache { namespace thrift { using folly::EventBaseManager; using proxygen::HTTPMessage; using std::string; using std::unordered_map; ChannelTestFixture::ChannelTestFixture() { EventBaseManager::get()->setEventBase(eventBase_.get(), true); responseHandler_ = std::make_unique(eventBase_.get()); } void ChannelTestFixture::sendAndReceiveStream( ThriftProcessor* processor, const unordered_map& inputHeaders, const string& inputPayload, string::size_type chunkSize, unordered_map*& outputHeaders, IOBuf*& outputPayload, bool omitEnvelope) { auto channel = std::make_shared( responseHandler_->getTransaction(), processor, worker_); string payload; if (omitEnvelope) { payload = inputPayload; } else { auto envelopeBuf = std::make_unique(); CompactProtocolWriter writer; writer.setOutput(envelopeBuf.get()); writer.writeMessageBegin("dummy", MessageType::T_CALL, 0); string envelope = envelopeBuf->move()->moveToFbString().toStdString(); payload = envelope + inputPayload; } eventBase_->runInEventBaseThread([&]() { auto msg = std::make_unique(); auto& headers = msg->getHeaders(); for (auto it = inputHeaders.begin(); it != inputHeaders.end(); ++it) { headers.rawSet(it->first, it->second); } channel->onH2StreamBegin(std::move(msg)); const char* data = payload.data(); string::size_type len = payload.length(); string::size_type incr = (chunkSize == 0) ? len : chunkSize; for (string::size_type i = 0; i < len; i += incr) { auto iobuf = IOBuf::copyBuffer(data + i, std::min(incr, len - i)); channel->onH2BodyFrame(std::move(iobuf)); } channel->onH2StreamEnd(); }); eventBase_->loop(); // The loop exits when FakeResponseHandler::sendEOM() is called. outputHeaders = responseHandler_->getHeaders(); outputPayload = responseHandler_->getBodyBuf(); } string ChannelTestFixture::toString(IOBuf* buf) { // Clone so we do not destroy the IOBuf - just in case. return buf->clone()->moveToFbString().toStdString(); } } // namespace thrift } // namespace apache