/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #if FOLLY_HAS_COROUTINES namespace thrift { namespace py3 { template class ClientBufferedStreamWrapper { public: ClientBufferedStreamWrapper() = default; explicit ClientBufferedStreamWrapper( apache::thrift::ClientBufferedStream& s) : gen_{std::move(s).toAsyncGenerator()} {} folly::coro::Task> getNext() { auto res = co_await gen_.getNext(); if (res.has_value()) { co_return std::move(res.value()); } co_return folly::none; } private: folly::python::AsyncGeneratorWrapper gen_; }; template apache::thrift::ResponseAndServerStream createResponseAndServerStream( Response response, apache::thrift::ServerStream stream) { return {std::move(response), std::move(stream)}; } void cancelPythonIterator(PyObject*); inline folly::Function pythonFuncToCppFunc(PyObject* func) { Py_INCREF(func); return [func = std::move(func)] { if (func != Py_None) { PyObject_CallObject(func, nullptr); } Py_DECREF(func); }; } template apache::thrift::ServerStream createAsyncIteratorFromPyIterator( PyObject* iter, folly::Executor* executor, folly::Function>)> genNext) { Py_INCREF(iter); auto guard = folly::makeGuard([iter, executor = folly::getKeepAliveToken(executor)] { // Ensure the Python async generator is destroyed on a Python thread executor->add([iter] { Py_DECREF(iter); }); }); return folly::coro::co_invoke( [iter, executor = std::move(executor), guard = std::move(guard), genNext = std::move( genNext)]() mutable -> folly::coro::AsyncGenerator { Py_INCREF(iter); auto innerGuard = folly::makeGuard([iter] { Py_DECREF(iter); }); folly::CancellationCallback cb{ co_await folly::coro::co_current_cancellation_token, [iter, executor, guard = std::move(innerGuard)]() mutable { folly::via(executor, [iter, guard = std::move(guard)] { cancelPythonIterator(iter); }); }}; while (true) { auto [promise, future] = folly::makePromiseContract>( executor); folly::via( executor, [&genNext, iter, promise = std::move(promise)]() mutable { genNext(iter, std::move(promise)); }); auto val = co_await std::move(future); if (!val) { break; } co_yield std::move(val.value()); } }); } } // namespace py3 } // namespace thrift #else /* !FOLLY_HAS_COROUTINES */ #error Thrift stream type support needs C++ coroutines, which are not currently available. \ Use a modern compiler and pass appropriate options to enable C++ coroutine support, \ or consider passing the Thrift compiler the mstch_py3:no_stream option in order to \ ignore stream type fields when generating code. #endif