/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_ #define THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_ 1 #include #include namespace { const uint32_t kInitialBufferSize = 4096; } namespace apache { namespace thrift { namespace async { namespace detail { template TUnframedACReadState::TUnframedACReadState() : maxMessageSize_(0x7fffffff), memBuffer_(kInitialBufferSize), callbackBuffer_(nullptr), protocolTraits_() {} template TUnframedACReadState::~TUnframedACReadState() {} template void TUnframedACReadState::getReadBuffer( void** bufReturn, size_t* lenReturn) { uint32_t bytesAvailable = memBuffer_.available_write(); if (bytesAvailable > 0) { // If there is room available in the buffer, just return it. *lenReturn = bytesAvailable; *bufReturn = memBuffer_.getWritePtr(bytesAvailable); return; } uint32_t bufferSize = memBuffer_.getBufferSize(); uint32_t available_read = memBuffer_.available_read(); // we get this much without growing the buffer capacity uint32_t additionalSpace = bufferSize - available_read; if (additionalSpace == 0) { // We need more room. memBuffer_ will at least double it's capacity when // asked for even a single byte. additionalSpace = kInitialBufferSize; } // Don't allow more than maxMessageSize_. // Be careful not to over- or underflow uint32_t when checking. // // readDataAvailable() fails the read when we've already read maxMessageSize_ // bytes, so available_read should always be less than maxMessageSize_ here. // (Unless maxMessageSize_ is 0, but that's a programmer bug.) assert(available_read < maxMessageSize_); if (available_read > maxMessageSize_ - additionalSpace) { // Don't ask for more than maxMessageSize_ total (but we might get more) additionalSpace = maxMessageSize_ - available_read; } try { uint8_t* newBuffer = memBuffer_.getWritePtr(additionalSpace); *lenReturn = memBuffer_.available_write(); *bufReturn = newBuffer; } catch (std::exception& ex) { T_ERROR( "TUnframedAsyncChannel: failed to allocate larger read buffer: %s", ex.what()); *lenReturn = 0; *bufReturn = nullptr; } } template bool TUnframedACReadState::readDataAvailable(size_t len) { assert(memBuffer_.available_read() + len <= memBuffer_.getBufferSize()); memBuffer_.wroteBytes(len); uint32_t messageLength = 0; uint32_t bytesRead = memBuffer_.available_read(); uint8_t* buffer = (uint8_t*)memBuffer_.borrow(nullptr, &bytesRead); if (!protocolTraits_.getMessageLength(buffer, bytesRead, &messageLength)) { // We're not at the end of the message yet. // // If we've hit maxMessageSize_ already, fail now instead of waiting until // getReadBuffer() is called again. if (bytesRead >= maxMessageSize_) { throw transport::TTransportException( transport::TTransportException::CORRUPTED_DATA, "TUnframedAsyncChannel: max message size exceeded"); } return false; } // We've read a full message. // Swap the data into the callback's buffer. // Note that we may have actually read more than one message, // so we have to make sure to save any remaining data after the end of the // message. assert(messageLength <= bytesRead); callbackBuffer_->link(&memBuffer_, messageLength); memBuffer_.consume(messageLength); // We've put a new message in callbackBuffer_ return true; } } // namespace detail } // namespace async } // namespace thrift } // namespace apache #endif // THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_