/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include namespace apache { namespace thrift { namespace rocket { template FrameLengthParserStrategy::~FrameLengthParserStrategy() { if (frameLengthAndFieldSize_) { owner_.decMemoryUsage(frameLengthAndFieldSize_); } } template void FrameLengthParserStrategy::getReadBuffer( void** bufReturn, size_t* lenReturn) { auto tail = readBufQueue_.tailroom(); if (tail < Serializer::kBytesForFrameOrMetadataLength) { const auto ret = readBufQueue_.preallocate(minBufferSize_, maxBufferSize_); *bufReturn = ret.first; *lenReturn = ret.second; } else { *bufReturn = readBufQueue_.writableTail(); *lenReturn = tail; } } template void FrameLengthParserStrategy::readDataAvailable(size_t len) { incrSize(len); readBufQueue_.postallocate(len); drainReadBufQueue(); } template void FrameLengthParserStrategy::readBufferAvailable( std::unique_ptr buf) { incrSize(buf->computeChainDataLength()); readBufQueue_.append(std::move(buf), true, true); drainReadBufQueue(); } template template void FrameLengthParserStrategy::drainReadBufQueue() { while (size_ >= Serializer::kBytesForFrameOrMetadataLength) { if (!frameLength_) { computeFrameLength(); if (UNLIKELY(!owner_.incMemoryUsage(frameLengthAndFieldSize_))) { frameLengthAndFieldSize_ = 0; return; } if (resize) { tryResize(); } } if (size_ < frameLengthAndFieldSize_) { return; } // skip frame length field readBufQueue_.trimStart(Serializer::kBytesForFrameOrMetadataLength); // split out frame auto frame = readBufQueue_.split(frameLength_); SCOPE_EXIT { // reset the frame length fields resetFrameLength(); }; // hand frame off owner_.handleFrame(std::move(frame)); } } template void FrameLengthParserStrategy::computeFrameLength() { cursor_.reset(readBufQueue_.front()); frameLength_ = readFrameOrMetadataSize(cursor_); frameLengthAndFieldSize_ = frameLength_ + Serializer::kBytesForFrameOrMetadataLength; } template void FrameLengthParserStrategy::resetFrameLength() { owner_.decMemoryUsage(frameLengthAndFieldSize_); size_ -= frameLengthAndFieldSize_; frameLength_ = 0; frameLengthAndFieldSize_ = 0; } template void FrameLengthParserStrategy::tryResize() { if (readBufQueue_.tailroom() < frameLength_) { auto max = std::max(frameLengthAndFieldSize_, maxBufferSize_); readBufQueue_.preallocate(minBufferSize_, max, max); } } } // namespace rocket } // namespace thrift } // namespace apache