1 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3 use crate::buf::GrpcSlice;
4 use crate::call::MessageReader;
5 use crate::error::Result;
6
7 pub type DeserializeFn<T> = fn(MessageReader) -> Result<T>;
8 pub type SerializeFn<T> = fn(&T, &mut GrpcSlice) -> Result<()>;
9
10 /// According to https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md, grpc uses
11 /// a four bytes to describe the length of a message, so it should not exceed u32::MAX.
12 pub const MAX_MESSAGE_SIZE: usize = std::u32::MAX as usize;
13
14 /// Defines how to serialize and deserialize between the specialized type and byte slice.
15 pub struct Marshaller<T> {
16 // Use function pointer here to simplify the signature.
17 // Compiler will probably inline the function so performance
18 // impact can be omitted.
19 //
20 // Using trait will require a trait object or generic, which will
21 // either have performance impact or make signature complicated.
22 //
23 // const function is not stable yet (rust-lang/rust#24111), hence
24 // make all fields public.
25 /// The serialize function.
26 pub ser: SerializeFn<T>,
27
28 /// The deserialize function.
29 pub de: DeserializeFn<T>,
30 }
31
32 #[cfg(any(feature = "protobuf-codec", feature = "protobufv3-codec"))]
33 pub mod pb_codec {
34 #[cfg(feature = "protobuf-codec")]
35 use protobuf::{CodedOutputStream, Message};
36
37 #[cfg(feature = "protobufv3-codec")]
38 use protobufv3::{CodedOutputStream, Message};
39
40 use super::{from_buf_read, MessageReader, MAX_MESSAGE_SIZE};
41 use crate::buf::GrpcSlice;
42 use crate::error::{Error, Result};
43
44 #[inline]
ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()>45 pub fn ser<T: Message>(t: &T, buf: &mut GrpcSlice) -> Result<()> {
46 let cap = t.compute_size() as usize;
47 // FIXME: This is not a practical fix until stepancheg/rust-protobuf#530 is fixed.
48 if cap <= MAX_MESSAGE_SIZE {
49 unsafe {
50 let bytes = buf.realloc(cap);
51 let raw_bytes = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
52 let mut s = CodedOutputStream::bytes(raw_bytes);
53 t.write_to_with_cached_sizes(&mut s).map_err(Into::into)
54 }
55 } else {
56 Err(Error::Codec(
57 format!("message is too large: {cap} > {MAX_MESSAGE_SIZE}").into(),
58 ))
59 }
60 }
61
62 #[inline]
de<T: Message>(mut reader: MessageReader) -> Result<T>63 pub fn de<T: Message>(mut reader: MessageReader) -> Result<T> {
64 let mut s = from_buf_read(&mut reader);
65 let mut m = T::new();
66 m.merge_from(&mut s)?;
67 Ok(m)
68 }
69 }
70
71 #[cfg(feature = "protobuf-codec")]
from_buf_read(reader: &mut MessageReader) -> protobuf::CodedInputStream72 fn from_buf_read(reader: &mut MessageReader) -> protobuf::CodedInputStream {
73 protobuf::CodedInputStream::from_buffered_reader(reader)
74 }
75
76 #[cfg(feature = "protobufv3-codec")]
from_buf_read(reader: &mut MessageReader) -> protobufv3::CodedInputStream77 fn from_buf_read(reader: &mut MessageReader) -> protobufv3::CodedInputStream {
78 protobufv3::CodedInputStream::from_buf_read(reader)
79 }
80
81 #[cfg(feature = "prost-codec")]
82 pub mod pr_codec {
83 use prost::Message;
84
85 use super::{MessageReader, MAX_MESSAGE_SIZE};
86 use crate::buf::GrpcSlice;
87 use crate::error::{Error, Result};
88
89 #[inline]
ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()>90 pub fn ser<M: Message>(msg: &M, buf: &mut GrpcSlice) -> Result<()> {
91 let size = msg.encoded_len();
92 if size <= MAX_MESSAGE_SIZE {
93 unsafe {
94 let bytes = buf.realloc(size);
95 let mut b = &mut *(bytes as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
96 msg.encode(&mut b)?;
97 debug_assert!(b.is_empty());
98 }
99 Ok(())
100 } else {
101 Err(Error::Codec(
102 format!("message is too large: {size} > {MAX_MESSAGE_SIZE}").into(),
103 ))
104 }
105 }
106
107 #[inline]
de<M: Message + Default>(mut reader: MessageReader) -> Result<M>108 pub fn de<M: Message + Default>(mut reader: MessageReader) -> Result<M> {
109 use bytes::buf::Buf;
110 reader.advance(0);
111 M::decode(reader).map_err(Into::into)
112 }
113 }
114