// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::buf::GrpcSlice; use crate::call::MessageReader; use crate::error::Result; pub type DeserializeFn = fn(MessageReader) -> Result; pub type SerializeFn = fn(&T, &mut GrpcSlice) -> Result<()>; /// According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md, grpc uses /// a four bytes to describe the length of a message, so it should not exceed u32::MAX. pub const MAX_MESSAGE_SIZE: usize = std::u32::MAX as usize; /// Defines how to serialize and deserialize between the specialized type and byte slice. pub struct Marshaller { // Use function pointer here to simplify the signature. // Compiler will probably inline the function so performance // impact can be omitted. // // Using trait will require a trait object or generic, which will // either have performance impact or make signature complicated. // // const function is not stable yet (rust-lang/rust#24111), hence // make all fields public. /// The serialize function. pub ser: SerializeFn, /// The deserialize function. pub de: DeserializeFn, } #[cfg(any(feature = "protobuf-codec", feature = "protobufv3-codec"))] pub mod pb_codec { #[cfg(feature = "protobuf-codec")] use protobuf::{CodedOutputStream, Message}; #[cfg(feature = "protobufv3-codec")] use protobufv3::{CodedOutputStream, Message}; use super::{from_buf_read, MessageReader, MAX_MESSAGE_SIZE}; use crate::buf::GrpcSlice; use crate::error::{Error, Result}; #[inline] pub fn ser(t: &T, buf: &mut GrpcSlice) -> Result<()> { let cap = t.compute_size() as usize; // FIXME: This is not a practical fix until stepancheg/rust-protobuf#530 is fixed. if cap <= MAX_MESSAGE_SIZE { unsafe { let bytes = buf.realloc(cap); let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit] as *mut [u8]); let mut s = CodedOutputStream::bytes(raw_bytes); t.write_to_with_cached_sizes(&mut s).map_err(Into::into) } } else { Err(Error::Codec( format!("message is too large: {cap} > {MAX_MESSAGE_SIZE}").into(), )) } } #[inline] pub fn de(mut reader: MessageReader) -> Result { let mut s = from_buf_read(&mut reader); let mut m = T::new(); m.merge_from(&mut s)?; Ok(m) } } #[cfg(feature = "protobuf-codec")] fn from_buf_read(reader: &mut MessageReader) -> protobuf::CodedInputStream { protobuf::CodedInputStream::from_buffered_reader(reader) } #[cfg(feature = "protobufv3-codec")] fn from_buf_read(reader: &mut MessageReader) -> protobufv3::CodedInputStream { protobufv3::CodedInputStream::from_buf_read(reader) } #[cfg(feature = "prost-codec")] pub mod pr_codec { use prost::Message; use super::{MessageReader, MAX_MESSAGE_SIZE}; use crate::buf::GrpcSlice; use crate::error::{Error, Result}; #[inline] pub fn ser(msg: &M, buf: &mut GrpcSlice) -> Result<()> { let size = msg.encoded_len(); if size <= MAX_MESSAGE_SIZE { unsafe { let bytes = buf.realloc(size); let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit] as *mut [u8]); msg.encode(&mut b)?; debug_assert!(b.is_empty()); } Ok(()) } else { Err(Error::Codec( format!("message is too large: {size} > {MAX_MESSAGE_SIZE}").into(), )) } } #[inline] pub fn de(mut reader: MessageReader) -> Result { use bytes::buf::Buf; reader.advance(0); M::decode(reader).map_err(Into::into) } }