/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include namespace yarpl { namespace flowable { class ThriftStreamShim; } // namespace flowable } // namespace yarpl namespace apache { namespace thrift { namespace detail::test { class TestStreamProducerCallbackService; } template class ServerStreamMultiPublisher; template class ServerStream { public: using RichPayloadToSend = apache::thrift::detail::RichPayloadToSend; using UnorderedHeader = apache::thrift::detail::UnorderedHeader; using OrderedHeader = apache::thrift::detail::OrderedHeader; using MessageVariant = apache::thrift::detail::MessageVariant; #if FOLLY_HAS_COROUTINES /* implicit */ ServerStream(folly::coro::AsyncGenerator&& gen) : fn_(apache::thrift::detail::ServerGeneratorStream:: fromAsyncGenerator(std::move(gen))) {} /* implicit */ ServerStream( folly::coro::AsyncGenerator&& gen) : fn_(apache::thrift::detail::ServerGeneratorStream:: fromAsyncGenerator(std::move(gen))) {} using promise_type = folly::coro::detail::AsyncGeneratorPromise; #endif // Completion callback is optional // It may destroy the ServerStreamPublisher object inline // It must not call complete() on the publisher object inline static std::pair, ServerStreamPublisher> createPublisher( folly::Function onStreamCompleteOrCancel) { return createPublisherImpl(std::move(onStreamCompleteOrCancel)); } static std::pair, ServerStreamPublisher> createPublisherWithHeader(folly::Function onStreamCompleteOrCancel) { return createPublisherImpl(std::move(onStreamCompleteOrCancel)); } static std::pair, ServerStreamPublisher> createPublisher() { return createPublisher([] {}); } static ServerStream createEmpty() { auto pair = createPublisher(); std::move(pair.second).complete(); return std::move(pair.first); } [[deprecated( "Use ScopedServerInterfaceThread instead of invoking handler methods " "directly. This approach changes the threading model and can hide race " "conditions in production code.")]] // ClientBufferedStream toClientStreamUnsafeDoNotUse( folly::EventBase* evb = folly::getUnsafeMutableGlobalEventBase(), int32_t bufferSize = 100) &&; apache::thrift::detail::ServerStreamFactory operator()( folly::Executor::KeepAlive<> serverExecutor, apache::thrift::detail::StreamElementEncoder* encode) { return fn_(std::move(serverExecutor), encode); } private: explicit ServerStream(apache::thrift::detail::ServerStreamFn fn) : fn_(std::move(fn)) {} template static std::pair, ServerStreamPublisher> createPublisherImpl(folly::Function onStreamCompleteOrCancel) { auto pair = apache::thrift::detail::ServerPublisherStream::create( std::move(onStreamCompleteOrCancel)); return std:: make_pair, ServerStreamPublisher>( ServerStream(std::move(pair.first)), std::move(pair.second)); } apache::thrift::detail::ServerStreamFn fn_; friend class yarpl::flowable::ThriftStreamShim; friend class ServerStreamMultiPublisher; friend class ServerStreamMultiPublisher; friend class detail::test::TestStreamProducerCallbackService; }; template struct ResponseAndServerStream { using ResponseType = Response; using StreamElementType = StreamElement; Response response; ServerStream stream; }; struct ResponseAndServerStreamFactory { apache::thrift::SerializedResponse response; apache::thrift::detail::ServerStreamFactory stream; }; } // namespace thrift } // namespace apache #include