1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2 
3 use crate::buf::GrpcSlice;
4 use crate::call::MessageReader;
5 use crate::error::Result;
6 
7 pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
8 pub type SerializeFn<T> = fn(&T, &mut GrpcSlice) -> Result<()>;
9 
10 /// According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md, grpc uses
11 /// a four bytes to describe the length of a message, so it should not exceed u32::MAX.
12 pub const MAX_MESSAGE_SIZE: usize = std::u32::MAX as usize;
13 
14 /// Defines how to serialize and deserialize between the specialized type and byte slice.
15 pub struct Marshaller<T> {
16     // Use function pointer here to simplify the signature.
17     // Compiler will probably inline the function so performance
18     // impact can be omitted.
19     //
20     // Using trait will require a trait object or generic, which will
21     // either have performance impact or make signature complicated.
22     //
23     // const function is not stable yet (rust-lang/rust#24111), hence
24     // make all fields public.
25     /// The serialize function.
26     pub ser: SerializeFn<T>,
27 
28     /// The deserialize function.
29     pub de: DeserializeFn<T>,
30 }
31 
32 #[cfg(any(feature = "protobuf-codec", feature = "protobufv3-codec"))]
33 pub mod pb_codec {
34     #[cfg(feature = "protobuf-codec")]
35     use protobuf::{CodedOutputStream, Message};
36 
37     #[cfg(feature = "protobufv3-codec")]
38     use protobufv3::{CodedOutputStream, Message};
39 
40     use super::{from_buf_read, MessageReader, MAX_MESSAGE_SIZE};
41     use crate::buf::GrpcSlice;
42     use crate::error::{Error, Result};
43 
44     #[inline]
ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()>45     pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()> {
46         let cap = t.compute_size() as usize;
47         // FIXME: This is not a practical fix until stepancheg/rust-protobuf#530 is fixed.
48         if cap <= MAX_MESSAGE_SIZE {
49             unsafe {
50                 let bytes = buf.realloc(cap);
51                 let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
52                 let mut s = CodedOutputStream::bytes(raw_bytes);
53                 t.write_to_with_cached_sizes(&mut s).map_err(Into::into)
54             }
55         } else {
56             Err(Error::Codec(
57                 format!("message is too large: {cap} > {MAX_MESSAGE_SIZE}").into(),
58             ))
59         }
60     }
61 
62     #[inline]
de<T: Message>(mut reader: MessageReader) -> Result<T>63     pub fn de<T: Message>(mut reader: MessageReader) -> Result<T> {
64         let mut s = from_buf_read(&mut reader);
65         let mut m = T::new();
66         m.merge_from(&mut s)?;
67         Ok(m)
68     }
69 }
70 
71 #[cfg(feature = "protobuf-codec")]
from_buf_read(reader: &mut MessageReader) -> protobuf::CodedInputStream72 fn from_buf_read(reader: &mut MessageReader) -> protobuf::CodedInputStream {
73     protobuf::CodedInputStream::from_buffered_reader(reader)
74 }
75 
76 #[cfg(feature = "protobufv3-codec")]
from_buf_read(reader: &mut MessageReader) -> protobufv3::CodedInputStream77 fn from_buf_read(reader: &mut MessageReader) -> protobufv3::CodedInputStream {
78     protobufv3::CodedInputStream::from_buf_read(reader)
79 }
80 
81 #[cfg(feature = "prost-codec")]
82 pub mod pr_codec {
83     use prost::Message;
84 
85     use super::{MessageReader, MAX_MESSAGE_SIZE};
86     use crate::buf::GrpcSlice;
87     use crate::error::{Error, Result};
88 
89     #[inline]
ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()>90     pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()> {
91         let size = msg.encoded_len();
92         if size <= MAX_MESSAGE_SIZE {
93             unsafe {
94                 let bytes = buf.realloc(size);
95                 let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
96                 msg.encode(&mut b)?;
97                 debug_assert!(b.is_empty());
98             }
99             Ok(())
100         } else {
101             Err(Error::Codec(
102                 format!("message is too large: {size} > {MAX_MESSAGE_SIZE}").into(),
103             ))
104         }
105     }
106 
107     #[inline]
de<M: Message + Default>(mut reader: MessageReader) -> Result<M>108     pub fn de<M: Message + Default>(mut reader: MessageReader) -> Result<M> {
109         use bytes::buf::Buf;
110         reader.advance(0);
111         M::decode(reader).map_err(Into::into)
112     }
113 }
114