/* * Copyright (c) Meta Platforms, Inc. and affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ use std::ffi::CStr; use std::marker::PhantomData; use std::sync::Arc; use anyhow::bail; use anyhow::Error; use anyhow::Result; use async_trait::async_trait; use futures::stream::BoxStream; use crate::application_exception::ApplicationException; use crate::application_exception::ApplicationExceptionErrorCode; use crate::context_stack::ContextStack; use crate::exceptions::ExceptionInfo; use crate::exceptions::ResultInfo; use crate::framing::Framing; use crate::framing::FramingDecoded; use crate::framing::FramingEncodedFinal; use crate::protocol::Protocol; use crate::protocol::ProtocolDecoded; use crate::protocol::ProtocolReader; use crate::protocol::ProtocolWriter; use crate::request_context::RequestContext; use crate::serialize::Serialize; use crate::thrift_protocol::ProtocolID; use crate::ttype::TType; pub enum SerializedStreamElement { /// A normal stream response, without any error. Contains the serialized response. Success(Payload), /// Contains the serialized declared exception. DeclaredException(Payload), /// Contains the application exception. ApplicationException(ApplicationException), /// The serialization failed. Contains the error. SerializationError(Error), } pub trait ReplyState where F: Framing, { type RequestContext; fn send_reply(&self, reply: FramingEncodedFinal); fn send_stream_reply( &self, response: FramingEncodedFinal, stream: Option>>>, protocol_id: ProtocolID, ) -> Result<()>; fn set_interaction_processor( &self, _processor: Arc< dyn ThriftService< F, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self, > + ::std::marker::Send + 'static, >, ) -> Result<()> { bail!("Thrift server does not support interactions"); } } #[async_trait] pub trait ThriftService: Send + Sync + 'static where F: Framing + Send + 'static, { type Handler; type RequestContext; type ReplyState; async fn call( &self, req: FramingDecoded, req_ctxt: &Self::RequestContext, reply_state: Arc, ) -> Result<(), Error>; fn create_interaction( &self, _name: &str, ) -> ::anyhow::Result< Arc< dyn ThriftService< F, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { bail!("Thrift server does not support interactions"); } /// Returns function names this thrift service is able to handle, similar /// to the keys of C++'s createMethodMetadata(). /// /// Return value includes inherited functions from parent thrift services, /// and interactions' functions. fn get_method_names(&self) -> &'static [&'static str]; /// Applies to interactions only /// /// Termination callback is invoked immediately as soon as the client's /// termination signal is received. This differs to the interaction service /// being dropped, which only happens when all outstanding requests and /// streams have been completed. This is not invoked if the connection /// closes without the signal being received. async fn on_termination(&self); } #[async_trait] impl ThriftService for Box where T: ThriftService + Send + Sync + ?Sized, F: Framing + Send + 'static, T::RequestContext: Send + Sync + 'static, T::ReplyState: Send + Sync + 'static, { type Handler = T::Handler; type RequestContext = T::RequestContext; type ReplyState = T::ReplyState; async fn call( &self, req: FramingDecoded, req_ctxt: &Self::RequestContext, reply_state: Arc, ) -> Result<(), Error> { (**self).call(req, req_ctxt, reply_state).await } fn create_interaction( &self, name: &str, ) -> ::anyhow::Result< Arc< dyn ThriftService< F, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { (**self).create_interaction(name) } fn get_method_names(&self) -> &'static [&'static str] { (**self).get_method_names() } async fn on_termination(&self) { (**self).on_termination().await } } #[async_trait] impl ThriftService for Arc where T: ThriftService + Send + Sync + ?Sized, F: Framing + Send + 'static, T::RequestContext: Send + Sync + 'static, T::ReplyState: Send + Sync + 'static, { type Handler = T::Handler; type RequestContext = T::RequestContext; type ReplyState = T::ReplyState; async fn call( &self, req: FramingDecoded, req_ctxt: &Self::RequestContext, reply_state: Arc, ) -> Result<(), Error> { (**self).call(req, req_ctxt, reply_state).await } fn create_interaction( &self, name: &str, ) -> ::anyhow::Result< Arc< dyn ThriftService< F, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { (**self).create_interaction(name) } fn get_method_names(&self) -> &'static [&'static str] { (**self).get_method_names() } async fn on_termination(&self) { (**self).on_termination().await } } /// Trait implemented by a generated type to implement a service. #[async_trait] pub trait ServiceProcessor

where P: Protocol, { type RequestContext; type ReplyState; /// Given a method name, return a reference to the processor for that index. fn method_idx(&self, name: &[u8]) -> Result; /// Given a method index and the remains of the message input, get a future /// for the result of the method. This will only be called if the corresponding /// `method_idx()` returns an (index, ServiceProcessor) tuple. /// `frame` is a reference to the frame containing the request. /// `request` is a deserializer instance set up to decode the request. async fn handle_method( &self, idx: usize, //frame: &P::Frame, d: &mut P::Deserializer, req: ProtocolDecoded

, req_ctxt: &Self::RequestContext, reply_state: Arc, seqid: u32, ) -> Result<(), Error>; /// Given a method name, return a reference to the interaction creation fn for that index fn create_interaction_idx(&self, _name: &str) -> ::anyhow::Result<::std::primitive::usize> { bail!("Processor does not support interactions"); } /// Given a creation method index, it produces a fresh interaction processor fn handle_create_interaction( &self, _idx: ::std::primitive::usize, ) -> ::anyhow::Result< Arc< dyn ThriftService< P::Frame, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { bail!("Processor does not support interactions"); } /// See [ThriftService::on_termination] docs async fn handle_on_termination(&self); } /// Null processor which implements no methods - it acts as the super for any service /// which has no super-service. #[derive(Debug, Clone)] pub struct NullServiceProcessor { _phantom: PhantomData<(P, R, RS)>, } impl NullServiceProcessor { pub fn new() -> Self { Self { _phantom: PhantomData, } } } impl Default for NullServiceProcessor { fn default() -> Self { Self::new() } } #[async_trait] impl ServiceProcessor

for NullServiceProcessor where P: Protocol + Sync, P::Deserializer: Send, R: Sync, RS: Sync + Send, { type RequestContext = R; type ReplyState = RS; #[inline] fn method_idx(&self, name: &[u8]) -> Result { Err(ApplicationException::new( ApplicationExceptionErrorCode::UnknownMethod, format!("Unknown method {}", String::from_utf8_lossy(name)), )) } async fn handle_method( &self, _idx: usize, //_frame: &P::Frame, _d: &mut P::Deserializer, _req: ProtocolDecoded

, _req_ctxt: &R, _reply_state: Arc, _seqid: u32, ) -> Result<(), Error> { // Should never be called since method_idx() always returns an error unimplemented!("NullServiceProcessor implements no methods") } fn create_interaction_idx(&self, name: &str) -> ::anyhow::Result<::std::primitive::usize> { bail!("Unknown interaction {}", name); } fn handle_create_interaction( &self, _idx: ::std::primitive::usize, ) -> ::anyhow::Result< Arc< dyn ThriftService< P::Frame, Handler = (), RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { unimplemented!("NullServiceProcessor implements no interactions") } async fn handle_on_termination(&self) {} } #[async_trait] impl ThriftService for NullServiceProcessor where P: Protocol + Send + Sync + 'static, P::Frame: Send + 'static, R: RequestContext + Send + Sync + 'static, R::ContextStack: ContextStack, RS: ReplyState + Send + Sync + 'static, { type Handler = (); type RequestContext = R; type ReplyState = RS; async fn call( &self, req: ProtocolDecoded

, rctxt: &R, reply_state: Arc, ) -> Result<(), Error> { let mut p = P::deserializer(req); const SERVICE_NAME: &str = "NullService"; let ((name, ae), _, seqid) = p.read_message_begin(|name| { let name = String::from_utf8_lossy(name).into_owned(); let ae = ApplicationException::unimplemented_method(SERVICE_NAME, &name); (name, ae) })?; p.skip(TType::Struct)?; p.read_message_end()?; rctxt.set_user_exception_header(ae.exn_name(), &ae.exn_value())?; let res = serialize!(P, |p| { p.write_message_begin(&name, ae.result_type().message_type(), seqid); ae.write(p); p.write_message_end(); }); reply_state.send_reply(res); Ok(()) } fn create_interaction( &self, name: &str, ) -> ::anyhow::Result< Arc< dyn ThriftService< P::Frame, Handler = Self::Handler, RequestContext = Self::RequestContext, ReplyState = Self::ReplyState, > + ::std::marker::Send + 'static, >, > { bail!("Unimplemented interaction {}", name); } fn get_method_names(&self) -> &'static [&'static str] { &[] } async fn on_termination(&self) {} }