{{! Copyright (c) Meta Platforms, Inc. and affiliates. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. }} {{#service:interactions?}}{{! }}pub mod {{service:snake}} {{>lib/block}} use super::*; {{#service:interactions}} {{>lib/server}} {{/service:interactions}} } {{/service:interactions?}} {{^service:interaction?}} {{>lib/server_trait}} {{/service:interaction?}} /// Processor for {{service:rust_name}}'s methods. #[derive(Clone, Debug)] pub struct {{service:rust_name}}Processor { service: H,{{! }}{{#service:extends?}} supa: SS,{{! }}{{/service:extends?}}{{! }}{{^service:extends?}} supa: ::fbthrift::NullServiceProcessor,{{! }}{{/service:extends?}} _phantom: ::std::marker::PhantomData<(P, H, R, RS)>, } {{^service:interaction?}} {{>lib/server_arg}} {{/service:interaction?}} impl {{service:rust_name}}Processor where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Frame: ::std::marker::Send + 'static, P::Deserializer: ::std::marker::Send, H: {{service:rust_name}}{{! }}{{#service:requestContext?}}{{/service:requestContext?}}, R: ::fbthrift::RequestContext + ::std::marker::Send + ::std::marker::Sync + 'static, RS: ::fbthrift::ReplyState + ::std::marker::Send + ::std::marker::Sync + 'static, ::ContextStack: ::fbthrift::ContextStack::Frame> + ::std::marker::Send + ::std::marker::Sync, ::fbthrift::ProtocolDecoded

: ::std::clone::Clone, ::fbthrift::ProtocolEncodedFinal

: ::std::clone::Clone + ::fbthrift::BufExt,{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService, SS::Handler: {{>lib/super}}{{! }}{{/service:extends?}} { pub fn new({{! }}service: H{{! }}{{#service:extends?}}, supa: SS{{/service:extends?}}{{! }}) -> Self { Self { service, supa{{^service:extends?}}{{! }}: ::fbthrift::NullServiceProcessor::new(){{! }}{{/service:extends?}}, _phantom: ::std::marker::PhantomData, } } pub fn into_inner(self) -> H { self.service }{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:sink?}} #[::tracing::instrument(skip_all, name = "handler", fields(method = "{{service:name}}.{{function:name}}"))] async fn handle_{{function:name}}<'a>( &'a self, p: &'a mut P::Deserializer, req: ::fbthrift::ProtocolDecoded

, req_ctxt: &R, reply_state: ::std::sync::Arc, _seqid: ::std::primitive::u32, ) -> ::anyhow::Result<()> { use ::futures::FutureExt as _; const SERVICE_NAME: &::std::ffi::CStr = c"{{service:parent_service_name}}"; const METHOD_NAME: &::std::ffi::CStr = c"{{#service:interaction?}}{{service:name}}.{{/service:interaction?}}{{function:name}}"; const SERVICE_METHOD_NAME: &::std::ffi::CStr = c"{{service:parent_service_name}}.{{#service:interaction?}}{{service:name}}.{{/service:interaction?}}{{function:name}}"; let mut ctx_stack = req_ctxt.get_context_stack(SERVICE_NAME, SERVICE_METHOD_NAME)?; ::fbthrift::ContextStack::pre_read(&mut ctx_stack)?; let _args: self::Args_{{service:rust_name}}_{{function:name}} = ::fbthrift::Deserialize::read(p)?; let bytes_read = ::fbthrift::help::buf_len(&req)?; ::fbthrift::ContextStack::on_read_data(&mut ctx_stack, ::fbthrift::SerializedMessage { protocol: P::PROTOCOL_ID, method_name: METHOD_NAME, buffer: req, })?; ::fbthrift::ContextStack::post_read(&mut ctx_stack, bytes_read)?; let res = ::std::panic::AssertUnwindSafe( self.service.{{function:rust_name}}({{! }}{{#service:requestContext?}} req_ctxt,{{! }}{{/service:requestContext?}}{{! }}{{#function:args}} _args.{{field:rust_name}},{{! }}{{/function:args}} ) ) .catch_unwind() .await; // nested results - panic catch on the outside, method on the inside let res = match res { ::std::result::Result::Ok(::std::result::Result::Ok(res)) => { ::tracing::trace!(method = "{{service:name}}.{{function:name}}", "success"); {{#function:creates_interaction?}} {{#function:return_type}}{{#type:void?}} let (interaction_handler, res) = (res, ()); {{/type:void?}}{{^type:void?}} let (interaction_handler, res) = res; {{/type:void?}}{{/function:return_type}} let interaction_processor = ::std::sync::Arc::new({{program:crate}}::server::{{service:snake}}::{{function:interaction_name}}Processor::, R, RS>::new(interaction_handler)); reply_state.set_interaction_processor(interaction_processor)?; {{/function:creates_interaction?}} ::std::result::Result::Ok(res) } ::std::result::Result::Ok(::std::result::Result::Err(exn)) => { {{!TODO: Remove this once debug for stream is implemented.}} {{^function:stream?}} ::tracing::error!(method = "{{service:name}}.{{function:name}}", exception = ?exn); {{/function:stream?}} ::std::result::Result::Err(exn) } ::std::result::Result::Err(exn) => { let aexn = ::fbthrift::ApplicationException::handler_panic("{{service:name}}.{{function:name}}", exn); ::tracing::error!(method = "{{service:name}}.{{function:name}}", panic = ?aexn); ::std::result::Result::Err({{program:crate}}::services::{{service:snake}}::{{function:upcamel}}Exn::ApplicationException(aexn)) } }; {{^function:stream?}} let env = ::fbthrift::help::serialize_result_envelope::( "{{function:name}}", METHOD_NAME, _seqid, req_ctxt, &mut ctx_stack, res, )?; reply_state.send_reply(env); ::std::result::Result::Ok(()) {{/function:stream?}} {{#function:stream?}} use ::futures::StreamExt as _; let (response, stream) = match res { ::std::result::Result::Ok(res) => { {{#function:stream_has_first_response?}} let (response, stream) = res; let response = ::std::result::Result::Ok(response); {{/function:stream_has_first_response?}}{{! }}{{^function:stream_has_first_response?}} let response = ::std::result::Result::Ok(()); let stream = res; {{/function:stream_has_first_response?}} let stream = ::std::panic::AssertUnwindSafe(stream) .catch_unwind() .map(|item| { match item { ::std::result::Result::Ok(::std::result::Result::Ok(success)) => { let payload = ::fbthrift::help::serialize_stream_item::( ::std::result::Result::Ok(success), "{{function:name}}", ); ::fbthrift::SerializedStreamElement::Success(payload) } ::std::result::Result::Ok(::std::result::Result::Err({{program:crate}}::services::{{service:snake}}::{{function:upcamel}}StreamExn::ApplicationException(aexn))) => { tracing::info!(?aexn, method="{{service:name}}.{{function:name}}", "Streaming ApplicationException"); ::fbthrift::SerializedStreamElement::ApplicationException(aexn) } {{#function:stream_exceptions?}} ::std::result::Result::Ok(::std::result::Result::Err(exn)) => { tracing::debug!(?exn, method="{{service:name}}.{{function:name}}", "Streaming declared exception"); let payload = ::fbthrift::help::serialize_stream_item::( ::std::result::Result::Err(exn), "{{function:name}}", ); ::fbthrift::SerializedStreamElement::DeclaredException(payload) } {{/function:stream_exceptions?}} ::std::result::Result::Err(exn) => { tracing::error!(?exn, method="{{service:name}}.{{function:name}}", "Streaming unwind"); let aexn = ::fbthrift::ApplicationException::handler_panic("{{service:name}}.{{function:name}}", exn); ::fbthrift::SerializedStreamElement::ApplicationException(aexn) } } }) {{!TODO(T125947234): should we instrument with tracing?}} .boxed(); (response, ::std::option::Option::Some(stream)) }, ::std::result::Result::Err(exn) => (::std::result::Result::Err(exn), ::std::option::Option::None), }; let response = ::fbthrift::help::serialize_result_envelope::( "{{function:name}}", METHOD_NAME, _seqid, req_ctxt, &mut ctx_stack, response, )?; let _ = reply_state.send_stream_reply(response, stream, P::PROTOCOL_ID); ::std::result::Result::Ok(()) {{/function:stream?}} }{{! }}{{/function:sink?}}{{/function:starts_interaction?}}{{#function:starts_interaction?}} fn handle_{{function:name}}( &self, ) -> ::anyhow::Result<::std::boxed::Box> { self.service.{{function:rust_name}}() }{{! }}{{/function:starts_interaction?}}{{! }}{{/service:rustFunctions}} } #[::async_trait::async_trait] impl ::fbthrift::ServiceProcessor

for {{service:rust_name}}Processor where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Deserializer: ::std::marker::Send, H: {{service:rust_name}}{{! }}{{#service:requestContext?}}{{/service:requestContext?}},{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService, SS::Handler: {{>lib/super}},{{! }}{{/service:extends?}} P::Frame: ::std::marker::Send + 'static, R: ::fbthrift::RequestContext + ::std::marker::Send + ::std::marker::Sync + 'static, ::ContextStack: ::fbthrift::ContextStack::Frame> + ::std::marker::Send + ::std::marker::Sync + 'static, RS: ::fbthrift::ReplyState + ::std::marker::Send + ::std::marker::Sync + 'static, ::fbthrift::ProtocolDecoded

: ::std::clone::Clone, ::fbthrift::ProtocolEncodedFinal

: ::std::clone::Clone + ::fbthrift::BufExt, { type RequestContext = R; type ReplyState = RS; #[inline] fn method_idx(&self, name: &[::std::primitive::u8]) -> ::std::result::Result<::std::primitive::usize, ::fbthrift::ApplicationException> { match name {{>lib/block}}{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:sink?}} b"{{#service:interaction?}}{{service:name}}.{{/service:interaction?}}{{function:name}}" => ::std::result::Result::Ok({{function:index}}usize),{{! }}{{/function:sink?}}{{/function:starts_interaction?}}{{/service:rustFunctions}} _ => ::std::result::Result::Err(::fbthrift::ApplicationException::unknown_method()), } } #[allow(clippy::match_single_binding)] async fn handle_method( &self, idx: ::std::primitive::usize, _p: &mut P::Deserializer, _req: ::fbthrift::ProtocolDecoded

, _req_ctxt: &R, _reply_state: ::std::sync::Arc, _seqid: ::std::primitive::u32, ) -> ::anyhow::Result<()> { match idx {{>lib/block}}{{! }}{{#service:rustFunctions}}{{^function:starts_interaction?}}{{^function:sink?}} {{function:index}}usize => { self.handle_{{function:name}}(_p, _req, _req_ctxt, _reply_state, _seqid).await }{{! }}{{/function:sink?}}{{/function:starts_interaction?}}{{/service:rustFunctions}} bad => panic!( "{}: unexpected method idx {}", "{{service:rust_name}}Processor", bad ), } } #[allow(clippy::match_single_binding)] #[inline] fn create_interaction_idx(&self, name: &::std::primitive::str) -> ::anyhow::Result<::std::primitive::usize> { match name {{>lib/block}}{{! }}{{#service:rustFunctions}}{{#function:starts_interaction?}} "{{function:interaction_name}}" => ::std::result::Result::Ok({{function:index}}usize),{{! }}{{/function:starts_interaction?}}{{/service:rustFunctions}} _ => ::anyhow::bail!("Unknown interaction"), } } #[allow(clippy::match_single_binding)] fn handle_create_interaction( &self, idx: ::std::primitive::usize, ) -> ::anyhow::Result< ::std::sync::Arc + ::std::marker::Send + 'static> > { match idx {{>lib/block}}{{! }}{{#service:rustFunctions}}{{#function:starts_interaction?}} {{function:index}}usize => { let handler = self.handle_{{function:name}}()?; let server = ::std::sync::Arc::new({{program:crate}}::server::{{service:snake}}::{{function:interaction_name}}Processor::, R, RS>::new(handler)); ::std::result::Result::Ok(server) }{{! }}{{/function:starts_interaction?}}{{/service:rustFunctions}} bad => panic!( "{}: unexpected method idx {}", "{{service:rust_name}}Processor", bad ), } } async fn handle_on_termination(&self) { {{#service:interaction?}} self.service.on_termination().await {{/service:interaction?}} } } #[::async_trait::async_trait] impl ::fbthrift::ThriftService for {{service:rust_name}}Processor where P: ::fbthrift::Protocol + ::std::marker::Send + ::std::marker::Sync + 'static, P::Deserializer: ::std::marker::Send, P::Frame: ::std::marker::Send + 'static, H: {{service:rust_name}}{{! }}{{#service:requestContext?}}{{/service:requestContext?}},{{! }}{{#service:extends?}} SS: ::fbthrift::ThriftService, SS::Handler: {{>lib/super}}, P::Frame: ::std::marker::Send + 'static,{{! }}{{/service:extends?}} R: ::fbthrift::RequestContext + ::std::marker::Send + ::std::marker::Sync + 'static, ::ContextStack: ::fbthrift::ContextStack::Frame> + ::std::marker::Send + ::std::marker::Sync + 'static, RS: ::fbthrift::ReplyState + ::std::marker::Send + ::std::marker::Sync + 'static, ::fbthrift::ProtocolDecoded

: ::std::clone::Clone, ::fbthrift::ProtocolEncodedFinal

: ::std::clone::Clone + ::fbthrift::BufExt, { {{#service:interaction?}} // Interactions have () as their handler associated type // to make `create_interaction` have a common return type. type Handler = (); {{/service:interaction?}}{{^service:interaction?}} type Handler = H; {{/service:interaction?}} type RequestContext = R; type ReplyState = RS; #[tracing::instrument(level="trace", skip_all, fields(service = "{{service:name}}"))] async fn call( &self, req: ::fbthrift::ProtocolDecoded

, req_ctxt: &R, reply_state: ::std::sync::Arc, ) -> ::anyhow::Result<()> { use ::fbthrift::{ProtocolReader as _, ServiceProcessor as _}; let mut p = P::deserializer(req.clone()); let (idx, mty, seqid) = p.read_message_begin(|name| self.method_idx(name))?; if mty != ::fbthrift::MessageType::Call { return ::std::result::Result::Err(::std::convert::From::from(::fbthrift::ApplicationException::new( ::fbthrift::ApplicationExceptionErrorCode::InvalidMessageType, format!("message type {:?} not handled", mty) ))); } let idx = match idx { ::std::result::Result::Ok(idx) => idx, ::std::result::Result::Err(_) => { return self.supa.call(req, req_ctxt, reply_state).await; } }; self.handle_method(idx, &mut p, req, req_ctxt, reply_state, seqid).await?; p.read_message_end()?; ::std::result::Result::Ok(()) } fn create_interaction( &self, name: &::std::primitive::str, ) -> ::anyhow::Result< ::std::sync::Arc + ::std::marker::Send + 'static> > { use ::fbthrift::{ServiceProcessor as _}; let idx = self.create_interaction_idx(name); let idx = match idx { ::anyhow::Result::Ok(idx) => idx, ::anyhow::Result::Err(_) => { return self.supa.create_interaction(name); } }; self.handle_create_interaction(idx) } fn get_method_names(&self) -> &'static [&'static ::std::primitive::str] { &[ {{>lib/method_names}} ] } async fn on_termination(&self) { use ::fbthrift::{ServiceProcessor as _}; self.handle_on_termination().await } } {{^service:interaction?}} /// Construct a new instance of a {{service:rust_name}} service. /// /// This is called when a new instance of a Thrift service Processor /// is needed for a particular Thrift protocol. #[::tracing::instrument(level="debug", skip_all, fields(proto = ?proto))] pub fn make_{{service:rust_name}}_server( proto: ::fbthrift::ProtocolID, handler: H,{{! }}{{#service:extends?}} supa: SMAKE,{{! }}{{/service:extends?}} ) -> ::std::result::Result<::std::boxed::Box + ::std::marker::Send + 'static>, ::fbthrift::ApplicationException> where F: ::fbthrift::Framing + ::std::marker::Send + ::std::marker::Sync + 'static, H: {{service:rust_name}}{{! }}{{#service:requestContext?}}{{/service:requestContext?}},{{! }}{{#service:extends?}} SMAKE: ::std::ops::FnOnce(::fbthrift::ProtocolID) -> ::std::result::Result, SS: ::fbthrift::ThriftService, SS::Handler: {{>lib/super}},{{! }}{{/service:extends?}} R: ::fbthrift::RequestContext + ::std::marker::Send + ::std::marker::Sync + 'static, ::ContextStack: ::fbthrift::ContextStack + ::std::marker::Send + ::std::marker::Sync + 'static, RS: ::fbthrift::ReplyState + ::std::marker::Send + ::std::marker::Sync + 'static, ::fbthrift::FramingDecoded: ::std::clone::Clone, ::fbthrift::FramingEncodedFinal: ::std::clone::Clone + ::fbthrift::BufExt, { match proto { ::fbthrift::ProtocolID::BinaryProtocol => { ::std::result::Result::Ok(::std::boxed::Box::new({{service:rust_name}}Processor::<::fbthrift::BinaryProtocol, H, R, RS{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}>::new(handler{{! }}{{#service:extends?}}, supa(proto)?{{/service:extends?}}{{! }}))) } ::fbthrift::ProtocolID::CompactProtocol => { ::std::result::Result::Ok(::std::boxed::Box::new({{service:rust_name}}Processor::<::fbthrift::CompactProtocol, H, R, RS{{! }}{{#service:extends?}}, SS{{/service:extends?}}{{! }}>::new(handler{{! }}{{#service:extends?}}, supa(proto)?{{/service:extends?}}{{! }}))) } bad => { ::tracing::error!(method = "{{service:name}}.{{function:name}}", invalid_protocol = ?bad); ::std::result::Result::Err(::fbthrift::ApplicationException::invalid_protocol(bad)) } } } {{/service:interaction?}} {{!newline}}