/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_ #define THRIFT_ASYNC_RESPONSECHANNEL_H_ 1 #include #include #include #include #include #include #include #include #include #include #include namespace folly { class IOBuf; } extern const std::string kUnknownErrorCode; extern const std::string kOverloadedErrorCode; extern const std::string kAppOverloadedErrorCode; extern const std::string kAppClientErrorCode; extern const std::string kAppServerErrorCode; extern const std::string kTaskExpiredErrorCode; extern const std::string kProxyTransportExceptionErrorCode; extern const std::string kProxyClientProtocolExceptionErrorCode; extern const std::string kQueueOverloadedErrorCode; extern const std::string kInjectedFailureErrorCode; extern const std::string kServerQueueTimeoutErrorCode; extern const std::string kResponseTooBigErrorCode; extern const std::string kRequestTypeDoesntMatchServiceFunctionType; extern const std::string kMethodUnknownErrorCode; extern const std::string kInteractionIdUnknownErrorCode; extern const std::string kInteractionConstructorErrorErrorCode; extern const std::string kConnectionClosingErrorCode; extern const std::string kRequestParsingErrorCode; extern const std::string kChecksumMismatchErrorCode; extern const std::string kUnimplementedMethodErrorCode; extern const std::string kTenantQuotaExceededErrorCode; namespace apache { namespace thrift { class ResponseChannelRequest { public: using UniquePtr = std::unique_ptr; virtual bool isActive() const = 0; virtual bool isOneway() const = 0; virtual bool isStream() const { return false; } virtual bool isSink() const { return false; } virtual bool includeEnvelope() const = 0; apache::thrift::RpcKind rpcKind() const { if (isStream()) { return apache::thrift::RpcKind::SINGLE_REQUEST_STREAMING_RESPONSE; } if (isSink()) { return apache::thrift::RpcKind::SINK; } if (isOneway()) { return apache::thrift::RpcKind::SINGLE_REQUEST_NO_RESPONSE; } return apache::thrift::RpcKind::SINGLE_REQUEST_SINGLE_RESPONSE; } virtual bool isReplyChecksumNeeded() const { return false; } virtual void sendReply( ResponsePayload&&, MessageChannel::SendCallback* cb = nullptr, folly::Optional crc32 = folly::none) = 0; virtual void sendStreamReply( ResponsePayload&&, apache::thrift::detail::ServerStreamFactory&&, folly::Optional = folly::none) { throw std::logic_error("unimplemented"); } FOLLY_NODISCARD static bool sendStreamReply( ResponseChannelRequest::UniquePtr request, folly::EventBase* eb, ResponsePayload&& payload, StreamServerCallbackPtr callback, folly::Optional crc32 = folly::none) { // Destroying request can call onStreamCancel inline, which would be a // contract violation if we did it inline and returned true. SCOPE_EXIT { eb->runInEventBaseThread([request = std::move(request)] {}); }; return request->sendStreamReply( std::move(payload), std::move(callback), crc32); } #if FOLLY_HAS_COROUTINES virtual void sendSinkReply( ResponsePayload&&, apache::thrift::detail::SinkConsumerImpl&&, folly::Optional = folly::none) { throw std::logic_error("unimplemented"); } FOLLY_NODISCARD static bool sendSinkReply( ResponseChannelRequest::UniquePtr request, folly::EventBase* eb, ResponsePayload&& payload, SinkServerCallbackPtr callback, folly::Optional crc32 = folly::none) { SCOPE_EXIT { eb->runInEventBaseThread([request = std::move(request)] {}); }; return request->sendSinkReply( std::move(payload), std::move(callback), crc32); } #endif virtual void sendException( ResponsePayload&& response, MessageChannel::SendCallback* cb = nullptr) { // Until we start requesting payloads without the envelope we can pass any // sendException calls to sendReply sendReply(std::move(response), cb, folly::none); } virtual void sendErrorWrapped( folly::exception_wrapper ex, std::string exCode) = 0; virtual void sendQueueTimeoutResponse() {} virtual ~ResponseChannelRequest() = default; bool getShouldStartProcessing() { if (!tryStartProcessing()) { return false; } return true; } protected: // callTryStartProcessing is a helper method used in ResponseChannelRequest // wrapper subclasses to delegate tryStartProcessing() calls to the wrapped // ResponseChannelRequest. This is necessary due to the protected nature of // tryStartProcessing(). static bool callTryStartProcessing(ResponseChannelRequest* request) { return request->tryStartProcessing(); } virtual bool tryStartProcessing() = 0; FOLLY_NODISCARD virtual bool sendStreamReply( ResponsePayload&&, StreamServerCallbackPtr, folly::Optional = folly::none) { throw std::logic_error("unimplemented"); } FOLLY_NODISCARD virtual bool sendSinkReply( ResponsePayload&&, SinkServerCallbackPtr, folly::Optional = folly::none) { throw std::logic_error("unimplemented"); } bool startedProcessing_{false}; }; /** * ResponseChannel defines an asynchronous API for servers. */ class ResponseChannel : virtual public folly::DelayedDestruction { public: static const uint32_t ONEWAY_REQUEST_ID = std::numeric_limits::max(); class Callback { public: /** * reason is empty if closed due to EOF, or a pointer to an exception * if closed due to some sort of error. */ virtual void channelClosed(folly::exception_wrapper&&) = 0; virtual ~Callback() {} }; /** * The callback will be invoked on each new request. * It will remain installed until explicitly uninstalled, or until * channelClosed() is called. */ virtual void setCallback(Callback*) = 0; protected: ~ResponseChannel() override {} }; } // namespace thrift } // namespace apache #endif // #ifndef THRIFT_ASYNC_RESPONSECHANNEL_H_