/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include #include #include #include namespace apache::thrift::detail::test { SinkConsumer TestSinkService::range(int32_t from, int32_t to) { return SinkConsumer{ [from, to](folly::coro::AsyncGenerator gen) -> folly::coro::Task { int32_t i = from; while (auto item = co_await gen.next()) { EXPECT_EQ(i++, *item); } EXPECT_EQ(i, to + 1); co_return true; }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::rangeThrow(int32_t from, int32_t) { return SinkConsumer{ [from](folly::coro::AsyncGenerator gen) -> folly::coro::Task { bool throwed = false; try { int32_t i = from; while (auto item = co_await gen.next()) { EXPECT_EQ(i++, *item); } } catch (const std::exception& ex) { throwed = true; EXPECT_EQ("std::runtime_error: test", std::string(ex.what())); } EXPECT_TRUE(throwed); co_return true; }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::rangeFinalResponseThrow( int32_t from, int32_t) { return SinkConsumer{ [from](folly::coro::AsyncGenerator gen) -> folly::coro::Task { int32_t i = from; int counter = 5; while (auto item = co_await gen.next()) { if (counter-- > 0) { break; } EXPECT_EQ(i++, *item); } throw std::runtime_error("test"); }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::rangeEarlyResponse( int32_t from, int32_t, int32_t early) { return SinkConsumer{ [from, early](folly::coro::AsyncGenerator gen) -> folly::coro::Task { int32_t i = from; while (auto item = co_await gen.next()) { EXPECT_EQ(i++, *item); if (i == early) { co_return early; } } // shouldn't reach here co_return -1; }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::unSubscribedSink() { activeSinks_++; return SinkConsumer{ [g = folly::makeGuard([this]() { activeSinks_--; })]( folly::coro::AsyncGenerator gen) mutable -> folly::coro::Task { EXPECT_THROW(co_await gen.next(), TApplicationException); co_return true; }, 10 /* buffer size */ }; } folly::SemiFuture> TestSinkService::semifuture_unSubscribedSinkSlowReturn() { return folly::futures::sleep(std::chrono::seconds(1)).deferValue([=](auto&&) { activeSinks_++; return SinkConsumer{ [g = folly::makeGuard([this]() { activeSinks_--; })]( folly::coro::AsyncGenerator gen) mutable -> folly::coro::Task { co_await gen.next(); co_return true; }, 10 /* buffer size */ }; }); } bool TestSinkService::isSinkUnSubscribed() { return activeSinks_ == 0; } ResponseAndSinkConsumer TestSinkService::initialThrow() { MyException ex; *ex.reason_ref() = "reason"; throw ex; } SinkConsumer TestSinkService::rangeChunkTimeout() { return SinkConsumer{ [](folly::coro::AsyncGenerator gen) -> folly::coro::Task { EXPECT_THROW( co_await [&]() -> folly::coro::Task { int32_t i = 0; while (auto item = co_await gen.next()) { EXPECT_EQ(i++, *item); } }(), TApplicationException); co_return true; }, 10 /* buffer size */ } .setChunkTimeout(std::chrono::milliseconds(200)); } SinkConsumer TestSinkService::sinkThrow() { return SinkConsumer{ [](folly::coro::AsyncGenerator gen) -> folly::coro::Task { bool throwed = false; try { while (auto item = co_await gen.next()) { } } catch (const SinkException& ex) { throwed = true; EXPECT_EQ("test", *ex.reason_ref()); } catch (const std::exception& ex) { LOG(ERROR) << "catched unexpected exception " << ex.what(); } EXPECT_TRUE(throwed); co_return true; }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::sinkFinalThrow() { return SinkConsumer{ [](folly::coro::AsyncGenerator) -> folly::coro::Task { FinalException ex; *ex.reason_ref() = "test"; throw ex; }, 10 /* buffer size */ }; } void TestSinkService::purge() {} SinkConsumer TestSinkService::rangeCancelAt( int32_t from, int32_t to, int32_t cancelAt) { return SinkConsumer{ [from, to, cancelAt](folly::coro::AsyncGenerator gen) -> folly::coro::Task { // create custom CancellationSource folly::CancellationSource cancelSource; folly::CancellationToken parentCt = co_await folly::coro::co_current_cancellation_token; folly::CancellationCallback cb{ std::move(parentCt), [&] { cancelSource.requestCancellation(); }}; // cancellation will be requested asynchronously auto cancelTask = [&cancelSource]() -> folly::coro::Task { co_await folly::coro::sleep(std::chrono::milliseconds{200}); cancelSource.requestCancellation(); }; int32_t i = from; try { while (auto item = co_await folly::coro::co_withCancellation( cancelSource.getToken(), gen.next())) { EXPECT_EQ(i++, *item); if (i == cancelAt) { cancelTask() .scheduleOn(co_await folly::coro::co_current_executor) .start(); } } EXPECT_EQ(i, to + 1); co_return true; } catch (const folly::OperationCancelled&) { EXPECT_LE(i, to); co_return false; } }, 10 /* buffer size */ }; } SinkConsumer TestSinkService::rangeSlowFinalResponse( int32_t from, int32_t to) { return SinkConsumer{ [from, to](folly::coro::AsyncGenerator gen) -> folly::coro::Task { int32_t i = from; while (auto item = co_await gen.next()) { EXPECT_EQ(i++, *item); } EXPECT_EQ(i, to + 1); co_await folly::coro::sleep(std::chrono::seconds{5}); co_return true; }, 10 /* buffer size */ }; } } // namespace apache::thrift::detail::test