1 mod buf_read_iter; 2 mod buf_read_or_reader; 3 mod input_buf; 4 mod input_source; 5 6 use std::io; 7 use std::io::BufRead; 8 use std::io::Read; 9 use std::mem; 10 use std::mem::MaybeUninit; 11 12 #[cfg(feature = "bytes")] 13 use ::bytes::Bytes; 14 15 #[cfg(feature = "bytes")] 16 use crate::chars::Chars; 17 use crate::coded_input_stream::buf_read_iter::BufReadIter; 18 use crate::enums::Enum; 19 use crate::error::ProtobufError; 20 use crate::error::WireError; 21 use crate::misc::maybe_ununit_array_assume_init; 22 use crate::reflect::types::ProtobufTypeBool; 23 use crate::reflect::types::ProtobufTypeDouble; 24 use crate::reflect::types::ProtobufTypeFixed; 25 use crate::reflect::types::ProtobufTypeFixed32; 26 use crate::reflect::types::ProtobufTypeFixed64; 27 use crate::reflect::types::ProtobufTypeFloat; 28 use crate::reflect::types::ProtobufTypeInt32; 29 use crate::reflect::types::ProtobufTypeInt64; 30 use crate::reflect::types::ProtobufTypeSfixed32; 31 use crate::reflect::types::ProtobufTypeSfixed64; 32 use crate::reflect::types::ProtobufTypeSint32; 33 use crate::reflect::types::ProtobufTypeSint64; 34 use crate::reflect::types::ProtobufTypeTrait; 35 use crate::reflect::types::ProtobufTypeUint32; 36 use crate::reflect::types::ProtobufTypeUint64; 37 use crate::reflect::MessageDescriptor; 38 use crate::unknown::UnknownValue; 39 use crate::varint::decode::decode_varint32; 40 use crate::varint::decode::decode_varint64; 41 use crate::varint::MAX_VARINT_ENCODED_LEN; 42 use crate::wire_format; 43 use crate::wire_format::WireType; 44 use crate::zigzag::decode_zig_zag_32; 45 use crate::zigzag::decode_zig_zag_64; 46 use crate::EnumOrUnknown; 47 use crate::Message; 48 use crate::MessageDyn; 49 50 // Default recursion level limit. 100 is the default value of C++'s implementation. 51 const DEFAULT_RECURSION_LIMIT: u32 = 100; 52 53 // Max allocated vec when reading length-delimited from unknown input stream 54 pub(crate) const READ_RAW_BYTES_MAX_ALLOC: usize = 10_000_000; 55 56 /// Buffered read with handy utilities. 57 #[derive(Debug)] 58 pub struct CodedInputStream<'a> { 59 source: BufReadIter<'a>, 60 recursion_level: u32, 61 recursion_limit: u32, 62 } 63 64 impl<'a> CodedInputStream<'a> { 65 /// Wrap a `Read`. 66 /// 67 /// Note resulting `CodedInputStream` is buffered. 68 /// 69 /// If `Read` is buffered, the resulting stream will be double buffered, 70 /// consider using [`from_buf_read`](Self::from_buf_read) instead. new(read: &'a mut dyn Read) -> CodedInputStream<'a>71 pub fn new(read: &'a mut dyn Read) -> CodedInputStream<'a> { 72 CodedInputStream::from_buf_read_iter(BufReadIter::from_read(read)) 73 } 74 75 /// Create from `BufRead`. 76 /// 77 /// `CodedInputStream` will utilize `BufRead` buffer. from_buf_read(buf_read: &'a mut dyn BufRead) -> CodedInputStream<'a>78 pub fn from_buf_read(buf_read: &'a mut dyn BufRead) -> CodedInputStream<'a> { 79 CodedInputStream::from_buf_read_iter(BufReadIter::from_buf_read(buf_read)) 80 } 81 82 /// Read from byte slice from_bytes(bytes: &'a [u8]) -> CodedInputStream<'a>83 pub fn from_bytes(bytes: &'a [u8]) -> CodedInputStream<'a> { 84 CodedInputStream::from_buf_read_iter(BufReadIter::from_byte_slice(bytes)) 85 } 86 87 /// Read from `Bytes`. 88 /// 89 /// `CodedInputStream` operations like 90 /// [`read_tokio_bytes`](crate::CodedInputStream::read_tokio_bytes) 91 /// will return a shared copy of this bytes object. 92 #[cfg(feature = "bytes")] from_tokio_bytes(bytes: &'a Bytes) -> CodedInputStream<'a>93 pub fn from_tokio_bytes(bytes: &'a Bytes) -> CodedInputStream<'a> { 94 CodedInputStream::from_buf_read_iter(BufReadIter::from_bytes(bytes)) 95 } 96 from_buf_read_iter(source: BufReadIter<'a>) -> CodedInputStream<'a>97 fn from_buf_read_iter(source: BufReadIter<'a>) -> CodedInputStream<'a> { 98 CodedInputStream { 99 source, 100 recursion_level: 0, 101 recursion_limit: DEFAULT_RECURSION_LIMIT, 102 } 103 } 104 105 /// Set the recursion limit. set_recursion_limit(&mut self, limit: u32)106 pub fn set_recursion_limit(&mut self, limit: u32) { 107 self.recursion_limit = limit; 108 } 109 110 #[inline] incr_recursion(&mut self) -> crate::Result<()>111 pub(crate) fn incr_recursion(&mut self) -> crate::Result<()> { 112 if self.recursion_level >= self.recursion_limit { 113 return Err(ProtobufError::WireError(WireError::OverRecursionLimit).into()); 114 } 115 self.recursion_level += 1; 116 Ok(()) 117 } 118 119 #[inline] decr_recursion(&mut self)120 pub(crate) fn decr_recursion(&mut self) { 121 self.recursion_level -= 1; 122 } 123 124 /// How many bytes processed pos(&self) -> u64125 pub fn pos(&self) -> u64 { 126 self.source.pos() 127 } 128 129 /// How many bytes until current limit bytes_until_limit(&self) -> u64130 pub fn bytes_until_limit(&self) -> u64 { 131 self.source.bytes_until_limit() 132 } 133 134 /// Read bytes into given `buf`. 135 #[inline] read_exact(&mut self, buf: &mut [MaybeUninit<u8>]) -> crate::Result<()>136 pub fn read_exact(&mut self, buf: &mut [MaybeUninit<u8>]) -> crate::Result<()> { 137 self.source.read_exact(buf) 138 } 139 140 /// Read exact number of bytes as `Bytes` object. 141 /// 142 /// This operation returns a shared view if `CodedInputStream` is 143 /// constructed with `Bytes` parameter. 144 #[cfg(feature = "bytes")] read_raw_tokio_bytes(&mut self, count: usize) -> crate::Result<Bytes>145 fn read_raw_tokio_bytes(&mut self, count: usize) -> crate::Result<Bytes> { 146 self.source.read_exact_bytes(count) 147 } 148 149 /// Read one byte 150 #[inline(always)] read_raw_byte(&mut self) -> crate::Result<u8>151 pub fn read_raw_byte(&mut self) -> crate::Result<u8> { 152 self.source.read_byte() 153 } 154 155 /// Push new limit, return previous limit. push_limit(&mut self, limit: u64) -> crate::Result<u64>156 pub fn push_limit(&mut self, limit: u64) -> crate::Result<u64> { 157 self.source.push_limit(limit) 158 } 159 160 /// Restore previous limit. pop_limit(&mut self, old_limit: u64)161 pub fn pop_limit(&mut self, old_limit: u64) { 162 self.source.pop_limit(old_limit); 163 } 164 165 /// Are we at EOF? 166 #[inline(always)] eof(&mut self) -> crate::Result<bool>167 pub fn eof(&mut self) -> crate::Result<bool> { 168 self.source.eof() 169 } 170 171 /// Check we are at EOF. 172 /// 173 /// Return error if we are not at EOF. check_eof(&mut self) -> crate::Result<()>174 pub fn check_eof(&mut self) -> crate::Result<()> { 175 let eof = self.eof()?; 176 if !eof { 177 return Err(ProtobufError::WireError(WireError::UnexpectedEof).into()); 178 } 179 Ok(()) 180 } 181 read_raw_varint64_slow(&mut self) -> crate::Result<u64>182 fn read_raw_varint64_slow(&mut self) -> crate::Result<u64> { 183 let mut r: u64 = 0; 184 let mut i = 0; 185 loop { 186 if i == MAX_VARINT_ENCODED_LEN { 187 return Err(ProtobufError::WireError(WireError::IncorrectVarint).into()); 188 } 189 let b = self.read_raw_byte()?; 190 if i == 9 && (b & 0x7f) > 1 { 191 return Err(ProtobufError::WireError(WireError::IncorrectVarint).into()); 192 } 193 r = r | (((b & 0x7f) as u64) << (i * 7)); 194 i += 1; 195 if b < 0x80 { 196 return Ok(r); 197 } 198 } 199 } 200 read_raw_varint32_slow(&mut self) -> crate::Result<u32>201 fn read_raw_varint32_slow(&mut self) -> crate::Result<u32> { 202 let v = self.read_raw_varint64_slow()?; 203 if v > u32::MAX as u64 { 204 return Err(ProtobufError::WireError(WireError::U32Overflow(v)).into()); 205 } 206 Ok(v as u32) 207 } 208 209 /// Read varint 210 #[inline] read_raw_varint64(&mut self) -> crate::Result<u64>211 pub fn read_raw_varint64(&mut self) -> crate::Result<u64> { 212 let rem = self.source.remaining_in_buf(); 213 214 match decode_varint64(rem)? { 215 Some((r, c)) => { 216 self.source.consume(c); 217 Ok(r) 218 } 219 None => self.read_raw_varint64_slow(), 220 } 221 } 222 223 /// Read varint 224 #[inline] read_raw_varint32(&mut self) -> crate::Result<u32>225 pub fn read_raw_varint32(&mut self) -> crate::Result<u32> { 226 let rem = self.source.remaining_in_buf(); 227 228 match decode_varint32(rem)? { 229 Some((r, c)) => { 230 self.source.consume(c); 231 Ok(r) 232 } 233 None => self.read_raw_varint32_slow(), 234 } 235 } 236 237 #[inline] read_raw_varint32_or_eof(&mut self) -> crate::Result<Option<u32>>238 fn read_raw_varint32_or_eof(&mut self) -> crate::Result<Option<u32>> { 239 let rem = self.source.remaining_in_buf(); 240 let v = decode_varint32(rem)?; 241 match v { 242 Some((r, c)) => { 243 self.source.consume(c); 244 Ok(Some(r)) 245 } 246 None => { 247 if self.eof()? { 248 Ok(None) 249 } else { 250 let v = self.read_raw_varint32_slow()?; 251 Ok(Some(v)) 252 } 253 } 254 } 255 } 256 257 /// Read little-endian 32-bit integer read_raw_little_endian32(&mut self) -> crate::Result<u32>258 pub fn read_raw_little_endian32(&mut self) -> crate::Result<u32> { 259 let mut bytes = [MaybeUninit::uninit(); 4]; 260 self.read_exact(&mut bytes)?; 261 // SAFETY: `read_exact` guarantees that the buffer is filled. 262 let bytes = unsafe { maybe_ununit_array_assume_init(bytes) }; 263 Ok(u32::from_le_bytes(bytes)) 264 } 265 266 /// Read little-endian 64-bit integer read_raw_little_endian64(&mut self) -> crate::Result<u64>267 pub fn read_raw_little_endian64(&mut self) -> crate::Result<u64> { 268 let mut bytes = [MaybeUninit::uninit(); 8]; 269 self.read_exact(&mut bytes)?; 270 // SAFETY: `read_exact` guarantees that the buffer is filled. 271 let bytes = unsafe { maybe_ununit_array_assume_init(bytes) }; 272 Ok(u64::from_le_bytes(bytes)) 273 } 274 275 /// Read tag number as `u32` or None if EOF is reached. 276 #[inline] read_raw_tag_or_eof(&mut self) -> crate::Result<Option<u32>>277 pub fn read_raw_tag_or_eof(&mut self) -> crate::Result<Option<u32>> { 278 self.read_raw_varint32_or_eof() 279 } 280 281 /// Read tag 282 #[inline] read_tag(&mut self) -> crate::Result<wire_format::Tag>283 pub(crate) fn read_tag(&mut self) -> crate::Result<wire_format::Tag> { 284 let v = self.read_raw_varint32()?; 285 wire_format::Tag::new(v) 286 } 287 288 /// Read tag, return it is pair (field number, wire type) 289 #[inline] read_tag_unpack(&mut self) -> crate::Result<(u32, WireType)>290 pub(crate) fn read_tag_unpack(&mut self) -> crate::Result<(u32, WireType)> { 291 self.read_tag().map(|t| t.unpack()) 292 } 293 294 /// Read `double` read_double(&mut self) -> crate::Result<f64>295 pub fn read_double(&mut self) -> crate::Result<f64> { 296 let bits = self.read_raw_little_endian64()?; 297 Ok(f64::from_bits(bits)) 298 } 299 300 /// Read `float` read_float(&mut self) -> crate::Result<f32>301 pub fn read_float(&mut self) -> crate::Result<f32> { 302 let bits = self.read_raw_little_endian32()?; 303 Ok(f32::from_bits(bits)) 304 } 305 306 /// Read `int64` read_int64(&mut self) -> crate::Result<i64>307 pub fn read_int64(&mut self) -> crate::Result<i64> { 308 self.read_raw_varint64().map(|v| v as i64) 309 } 310 311 /// Read `int32` read_int32(&mut self) -> crate::Result<i32>312 pub fn read_int32(&mut self) -> crate::Result<i32> { 313 let v = self.read_int64()?; 314 i32::try_from(v).map_err(|_| WireError::I32Overflow(v).into()) 315 } 316 317 /// Read `uint64` read_uint64(&mut self) -> crate::Result<u64>318 pub fn read_uint64(&mut self) -> crate::Result<u64> { 319 self.read_raw_varint64() 320 } 321 322 /// Read `uint32` read_uint32(&mut self) -> crate::Result<u32>323 pub fn read_uint32(&mut self) -> crate::Result<u32> { 324 self.read_raw_varint32() 325 } 326 327 /// Read `sint64` read_sint64(&mut self) -> crate::Result<i64>328 pub fn read_sint64(&mut self) -> crate::Result<i64> { 329 self.read_uint64().map(decode_zig_zag_64) 330 } 331 332 /// Read `sint32` read_sint32(&mut self) -> crate::Result<i32>333 pub fn read_sint32(&mut self) -> crate::Result<i32> { 334 self.read_uint32().map(decode_zig_zag_32) 335 } 336 337 /// Read `fixed64` read_fixed64(&mut self) -> crate::Result<u64>338 pub fn read_fixed64(&mut self) -> crate::Result<u64> { 339 self.read_raw_little_endian64() 340 } 341 342 /// Read `fixed32` read_fixed32(&mut self) -> crate::Result<u32>343 pub fn read_fixed32(&mut self) -> crate::Result<u32> { 344 self.read_raw_little_endian32() 345 } 346 347 /// Read `sfixed64` read_sfixed64(&mut self) -> crate::Result<i64>348 pub fn read_sfixed64(&mut self) -> crate::Result<i64> { 349 self.read_raw_little_endian64().map(|v| v as i64) 350 } 351 352 /// Read `sfixed32` read_sfixed32(&mut self) -> crate::Result<i32>353 pub fn read_sfixed32(&mut self) -> crate::Result<i32> { 354 self.read_raw_little_endian32().map(|v| v as i32) 355 } 356 357 /// Read `bool` read_bool(&mut self) -> crate::Result<bool>358 pub fn read_bool(&mut self) -> crate::Result<bool> { 359 self.read_raw_varint64().map(|v| v != 0) 360 } 361 read_enum_value(&mut self) -> crate::Result<i32>362 pub(crate) fn read_enum_value(&mut self) -> crate::Result<i32> { 363 self.read_int32() 364 } 365 366 /// Read `enum` as `ProtobufEnum` read_enum<E: Enum>(&mut self) -> crate::Result<E>367 pub fn read_enum<E: Enum>(&mut self) -> crate::Result<E> { 368 let i = self.read_enum_value()?; 369 match Enum::from_i32(i) { 370 Some(e) => Ok(e), 371 None => Err(ProtobufError::WireError(WireError::InvalidEnumValue(E::NAME, i)).into()), 372 } 373 } 374 375 /// Read `enum` as `ProtobufEnumOrUnknown` read_enum_or_unknown<E: Enum>(&mut self) -> crate::Result<EnumOrUnknown<E>>376 pub fn read_enum_or_unknown<E: Enum>(&mut self) -> crate::Result<EnumOrUnknown<E>> { 377 Ok(EnumOrUnknown::from_i32(self.read_int32()?)) 378 } 379 read_repeated_packed_fixed_into<T: ProtobufTypeFixed>( &mut self, target: &mut Vec<T::ProtobufValue>, ) -> crate::Result<()>380 fn read_repeated_packed_fixed_into<T: ProtobufTypeFixed>( 381 &mut self, 382 target: &mut Vec<T::ProtobufValue>, 383 ) -> crate::Result<()> { 384 let len_bytes = self.read_raw_varint64()?; 385 386 let reserve = if len_bytes <= READ_RAW_BYTES_MAX_ALLOC as u64 { 387 (len_bytes as usize) / (T::ENCODED_SIZE as usize) 388 } else { 389 // prevent OOM on malformed input 390 // probably should truncate 391 READ_RAW_BYTES_MAX_ALLOC / (T::ENCODED_SIZE as usize) 392 }; 393 394 target.reserve(reserve); 395 396 let old_limit = self.push_limit(len_bytes)?; 397 while !self.eof()? { 398 target.push(T::read(self)?); 399 } 400 self.pop_limit(old_limit); 401 Ok(()) 402 } 403 read_repeated_packed_into<T: ProtobufTypeTrait>( &mut self, target: &mut Vec<T::ProtobufValue>, ) -> crate::Result<()>404 fn read_repeated_packed_into<T: ProtobufTypeTrait>( 405 &mut self, 406 target: &mut Vec<T::ProtobufValue>, 407 ) -> crate::Result<()> { 408 let len_bytes = self.read_raw_varint64()?; 409 410 // value is at least 1 bytes, so this is lower bound of element count 411 let reserve = if len_bytes <= READ_RAW_BYTES_MAX_ALLOC as u64 { 412 len_bytes as usize 413 } else { 414 // prevent OOM on malformed input 415 READ_RAW_BYTES_MAX_ALLOC 416 }; 417 418 target.reserve(reserve); 419 420 let old_limit = self.push_limit(len_bytes)?; 421 while !self.eof()? { 422 target.push(T::read(self)?); 423 } 424 self.pop_limit(old_limit); 425 Ok(()) 426 } 427 428 /// Read repeated packed `double` read_repeated_packed_double_into(&mut self, target: &mut Vec<f64>) -> crate::Result<()>429 pub fn read_repeated_packed_double_into(&mut self, target: &mut Vec<f64>) -> crate::Result<()> { 430 self.read_repeated_packed_fixed_into::<ProtobufTypeDouble>(target) 431 } 432 433 /// Read repeated packed `float` read_repeated_packed_float_into(&mut self, target: &mut Vec<f32>) -> crate::Result<()>434 pub fn read_repeated_packed_float_into(&mut self, target: &mut Vec<f32>) -> crate::Result<()> { 435 self.read_repeated_packed_fixed_into::<ProtobufTypeFloat>(target) 436 } 437 438 /// Read repeated packed `int64` read_repeated_packed_int64_into(&mut self, target: &mut Vec<i64>) -> crate::Result<()>439 pub fn read_repeated_packed_int64_into(&mut self, target: &mut Vec<i64>) -> crate::Result<()> { 440 self.read_repeated_packed_into::<ProtobufTypeInt64>(target) 441 } 442 443 /// Read repeated packed `int32` read_repeated_packed_int32_into(&mut self, target: &mut Vec<i32>) -> crate::Result<()>444 pub fn read_repeated_packed_int32_into(&mut self, target: &mut Vec<i32>) -> crate::Result<()> { 445 self.read_repeated_packed_into::<ProtobufTypeInt32>(target) 446 } 447 448 /// Read repeated packed `uint64` read_repeated_packed_uint64_into(&mut self, target: &mut Vec<u64>) -> crate::Result<()>449 pub fn read_repeated_packed_uint64_into(&mut self, target: &mut Vec<u64>) -> crate::Result<()> { 450 self.read_repeated_packed_into::<ProtobufTypeUint64>(target) 451 } 452 453 /// Read repeated packed `uint32` read_repeated_packed_uint32_into(&mut self, target: &mut Vec<u32>) -> crate::Result<()>454 pub fn read_repeated_packed_uint32_into(&mut self, target: &mut Vec<u32>) -> crate::Result<()> { 455 self.read_repeated_packed_into::<ProtobufTypeUint32>(target) 456 } 457 458 /// Read repeated packed `sint64` read_repeated_packed_sint64_into(&mut self, target: &mut Vec<i64>) -> crate::Result<()>459 pub fn read_repeated_packed_sint64_into(&mut self, target: &mut Vec<i64>) -> crate::Result<()> { 460 self.read_repeated_packed_into::<ProtobufTypeSint64>(target) 461 } 462 463 /// Read repeated packed `sint32` read_repeated_packed_sint32_into(&mut self, target: &mut Vec<i32>) -> crate::Result<()>464 pub fn read_repeated_packed_sint32_into(&mut self, target: &mut Vec<i32>) -> crate::Result<()> { 465 self.read_repeated_packed_into::<ProtobufTypeSint32>(target) 466 } 467 468 /// Read repeated packed `fixed64` read_repeated_packed_fixed64_into( &mut self, target: &mut Vec<u64>, ) -> crate::Result<()>469 pub fn read_repeated_packed_fixed64_into( 470 &mut self, 471 target: &mut Vec<u64>, 472 ) -> crate::Result<()> { 473 self.read_repeated_packed_fixed_into::<ProtobufTypeFixed64>(target) 474 } 475 476 /// Read repeated packed `fixed32` read_repeated_packed_fixed32_into( &mut self, target: &mut Vec<u32>, ) -> crate::Result<()>477 pub fn read_repeated_packed_fixed32_into( 478 &mut self, 479 target: &mut Vec<u32>, 480 ) -> crate::Result<()> { 481 self.read_repeated_packed_fixed_into::<ProtobufTypeFixed32>(target) 482 } 483 484 /// Read repeated packed `sfixed64` read_repeated_packed_sfixed64_into( &mut self, target: &mut Vec<i64>, ) -> crate::Result<()>485 pub fn read_repeated_packed_sfixed64_into( 486 &mut self, 487 target: &mut Vec<i64>, 488 ) -> crate::Result<()> { 489 self.read_repeated_packed_fixed_into::<ProtobufTypeSfixed64>(target) 490 } 491 492 /// Read repeated packed `sfixed32` read_repeated_packed_sfixed32_into( &mut self, target: &mut Vec<i32>, ) -> crate::Result<()>493 pub fn read_repeated_packed_sfixed32_into( 494 &mut self, 495 target: &mut Vec<i32>, 496 ) -> crate::Result<()> { 497 self.read_repeated_packed_fixed_into::<ProtobufTypeSfixed32>(target) 498 } 499 500 /// Read repeated packed `bool` read_repeated_packed_bool_into(&mut self, target: &mut Vec<bool>) -> crate::Result<()>501 pub fn read_repeated_packed_bool_into(&mut self, target: &mut Vec<bool>) -> crate::Result<()> { 502 self.read_repeated_packed_into::<ProtobufTypeBool>(target) 503 } 504 505 /// Read repeated packed enum values into the vector. read_repeated_packed_enum_values_into( &mut self, target: &mut Vec<i32>, ) -> crate::Result<()>506 pub(crate) fn read_repeated_packed_enum_values_into( 507 &mut self, 508 target: &mut Vec<i32>, 509 ) -> crate::Result<()> { 510 self.read_repeated_packed_into::<ProtobufTypeInt32>(target) 511 } 512 skip_group(&mut self) -> crate::Result<()>513 fn skip_group(&mut self) -> crate::Result<()> { 514 while !self.eof()? { 515 let wire_type = self.read_tag_unpack()?.1; 516 if wire_type == WireType::EndGroup { 517 break; 518 } 519 self.skip_field(wire_type)?; 520 } 521 Ok(()) 522 } 523 524 /// Read `UnknownValue` read_unknown(&mut self, wire_type: WireType) -> crate::Result<UnknownValue>525 pub fn read_unknown(&mut self, wire_type: WireType) -> crate::Result<UnknownValue> { 526 match wire_type { 527 WireType::Varint => self.read_raw_varint64().map(|v| UnknownValue::Varint(v)), 528 WireType::Fixed64 => self.read_fixed64().map(|v| UnknownValue::Fixed64(v)), 529 WireType::Fixed32 => self.read_fixed32().map(|v| UnknownValue::Fixed32(v)), 530 WireType::LengthDelimited => { 531 let len = self.read_raw_varint32()?; 532 self.read_raw_bytes(len) 533 .map(|v| UnknownValue::LengthDelimited(v)) 534 } 535 WireType::StartGroup => { 536 self.skip_group()?; 537 // We do not support groups, so just return something. 538 Ok(UnknownValue::LengthDelimited(Vec::new())) 539 } 540 WireType::EndGroup => { 541 Err(ProtobufError::WireError(WireError::UnexpectedWireType(wire_type)).into()) 542 } 543 } 544 } 545 546 /// Skip field. skip_field(&mut self, wire_type: WireType) -> crate::Result<()>547 pub fn skip_field(&mut self, wire_type: WireType) -> crate::Result<()> { 548 match wire_type { 549 WireType::Varint => self.read_raw_varint64().map(|_| ()), 550 WireType::Fixed64 => self.read_fixed64().map(|_| ()), 551 WireType::Fixed32 => self.read_fixed32().map(|_| ()), 552 WireType::LengthDelimited => { 553 let len = self.read_raw_varint32()?; 554 self.skip_raw_bytes(len) 555 } 556 WireType::StartGroup => self.skip_group(), 557 WireType::EndGroup => { 558 Err(ProtobufError::WireError(WireError::UnexpectedWireType(wire_type)).into()) 559 } 560 } 561 } 562 563 /// Read raw bytes into the supplied vector. The vector will be resized as needed and 564 /// overwritten. read_raw_bytes_into(&mut self, count: u32, target: &mut Vec<u8>) -> crate::Result<()>565 pub fn read_raw_bytes_into(&mut self, count: u32, target: &mut Vec<u8>) -> crate::Result<()> { 566 self.source.read_exact_to_vec(count as usize, target) 567 } 568 569 /// Read exact number of bytes read_raw_bytes(&mut self, count: u32) -> crate::Result<Vec<u8>>570 pub fn read_raw_bytes(&mut self, count: u32) -> crate::Result<Vec<u8>> { 571 let mut r = Vec::new(); 572 self.read_raw_bytes_into(count, &mut r)?; 573 Ok(r) 574 } 575 576 /// Skip exact number of bytes skip_raw_bytes(&mut self, count: u32) -> crate::Result<()>577 pub fn skip_raw_bytes(&mut self, count: u32) -> crate::Result<()> { 578 self.source.skip_bytes(count) 579 } 580 581 /// Read `bytes` field, length delimited read_bytes(&mut self) -> crate::Result<Vec<u8>>582 pub fn read_bytes(&mut self) -> crate::Result<Vec<u8>> { 583 let mut r = Vec::new(); 584 self.read_bytes_into(&mut r)?; 585 Ok(r) 586 } 587 588 /// Read `bytes` field, length delimited 589 #[cfg(feature = "bytes")] read_tokio_bytes(&mut self) -> crate::Result<Bytes>590 pub fn read_tokio_bytes(&mut self) -> crate::Result<Bytes> { 591 let len = self.read_raw_varint32()?; 592 self.read_raw_tokio_bytes(len as usize) 593 } 594 595 /// Read `string` field, length delimited 596 #[cfg(feature = "bytes")] read_tokio_chars(&mut self) -> crate::Result<Chars>597 pub fn read_tokio_chars(&mut self) -> crate::Result<Chars> { 598 let bytes = self.read_tokio_bytes()?; 599 Ok(Chars::from_bytes(bytes).map_err(ProtobufError::Utf8)?) 600 } 601 602 /// Read `bytes` field, length delimited read_bytes_into(&mut self, target: &mut Vec<u8>) -> crate::Result<()>603 pub fn read_bytes_into(&mut self, target: &mut Vec<u8>) -> crate::Result<()> { 604 let len = self.read_raw_varint32()?; 605 self.read_raw_bytes_into(len, target)?; 606 Ok(()) 607 } 608 609 /// Read `string` field, length delimited read_string(&mut self) -> crate::Result<String>610 pub fn read_string(&mut self) -> crate::Result<String> { 611 let mut r = String::new(); 612 self.read_string_into(&mut r)?; 613 Ok(r) 614 } 615 616 /// Read `string` field, length delimited read_string_into(&mut self, target: &mut String) -> crate::Result<()>617 pub fn read_string_into(&mut self, target: &mut String) -> crate::Result<()> { 618 target.clear(); 619 // take target's buffer 620 let mut vec = mem::replace(target, String::new()).into_bytes(); 621 self.read_bytes_into(&mut vec)?; 622 623 let s = match String::from_utf8(vec) { 624 Ok(t) => t, 625 Err(_) => return Err(ProtobufError::WireError(WireError::Utf8Error).into()), 626 }; 627 *target = s; 628 Ok(()) 629 } 630 631 /// Read message, do not check if message is initialized merge_message<M: Message>(&mut self, message: &mut M) -> crate::Result<()>632 pub fn merge_message<M: Message>(&mut self, message: &mut M) -> crate::Result<()> { 633 self.incr_recursion()?; 634 struct DecrRecursion<'a, 'b>(&'a mut CodedInputStream<'b>); 635 impl<'a, 'b> Drop for DecrRecursion<'a, 'b> { 636 fn drop(&mut self) { 637 self.0.decr_recursion(); 638 } 639 } 640 641 let mut decr = DecrRecursion(self); 642 643 let len = decr.0.read_raw_varint64()?; 644 let old_limit = decr.0.push_limit(len)?; 645 message.merge_from(&mut decr.0)?; 646 decr.0.pop_limit(old_limit); 647 Ok(()) 648 } 649 650 /// Like `merge_message`, but for dynamic messages. merge_message_dyn(&mut self, message: &mut dyn MessageDyn) -> crate::Result<()>651 pub fn merge_message_dyn(&mut self, message: &mut dyn MessageDyn) -> crate::Result<()> { 652 let len = self.read_raw_varint64()?; 653 let old_limit = self.push_limit(len)?; 654 message.merge_from_dyn(self)?; 655 self.pop_limit(old_limit); 656 Ok(()) 657 } 658 659 /// Read message read_message<M: Message>(&mut self) -> crate::Result<M>660 pub fn read_message<M: Message>(&mut self) -> crate::Result<M> { 661 let mut r: M = Message::new(); 662 self.merge_message(&mut r)?; 663 r.check_initialized()?; 664 Ok(r) 665 } 666 667 /// Read message. read_message_dyn( &mut self, descriptor: &MessageDescriptor, ) -> crate::Result<Box<dyn MessageDyn>>668 pub fn read_message_dyn( 669 &mut self, 670 descriptor: &MessageDescriptor, 671 ) -> crate::Result<Box<dyn MessageDyn>> { 672 let mut r = descriptor.new_instance(); 673 self.merge_message_dyn(&mut *r)?; 674 r.check_initialized_dyn()?; 675 Ok(r) 676 } 677 } 678 679 impl<'a> Read for CodedInputStream<'a> { read(&mut self, buf: &mut [u8]) -> io::Result<usize>680 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 681 self.source.read(buf).map_err(Into::into) 682 } 683 } 684 685 impl<'a> BufRead for CodedInputStream<'a> { fill_buf(&mut self) -> io::Result<&[u8]>686 fn fill_buf(&mut self) -> io::Result<&[u8]> { 687 self.source.fill_buf().map_err(Into::into) 688 } 689 consume(&mut self, amt: usize)690 fn consume(&mut self, amt: usize) { 691 self.source.consume(amt) 692 } 693 } 694 695 #[cfg(test)] 696 mod test { 697 698 use std::fmt::Debug; 699 use std::io; 700 use std::io::BufRead; 701 use std::io::Read; 702 703 use super::CodedInputStream; 704 use super::READ_RAW_BYTES_MAX_ALLOC; 705 use crate::error::ProtobufError; 706 use crate::hex::decode_hex; 707 use crate::wire_format::Tag; 708 use crate::wire_format::WireType; 709 use crate::CodedOutputStream; 710 test_read_partial<F>(hex: &str, mut callback: F) where F: FnMut(&mut CodedInputStream),711 fn test_read_partial<F>(hex: &str, mut callback: F) 712 where 713 F: FnMut(&mut CodedInputStream), 714 { 715 let d = decode_hex(hex); 716 // Test with buffered reader. 717 { 718 let mut reader = io::Cursor::new(&d); 719 let mut is = CodedInputStream::from_buf_read(&mut reader as &mut dyn BufRead); 720 assert_eq!(0, is.pos()); 721 callback(&mut is); 722 } 723 // Test from bytes. 724 { 725 let mut is = CodedInputStream::from_bytes(&d); 726 assert_eq!(0, is.pos()); 727 callback(&mut is); 728 } 729 } 730 test_read<F>(hex: &str, mut callback: F) where F: FnMut(&mut CodedInputStream),731 fn test_read<F>(hex: &str, mut callback: F) 732 where 733 F: FnMut(&mut CodedInputStream), 734 { 735 let len = decode_hex(hex).len(); 736 test_read_partial(hex, |reader| { 737 callback(reader); 738 assert!(reader.eof().expect("eof")); 739 assert_eq!(len as u64, reader.pos()); 740 }); 741 } 742 test_read_v<F, V>(hex: &str, v: V, mut callback: F) where F: FnMut(&mut CodedInputStream) -> crate::Result<V>, V: PartialEq + Debug,743 fn test_read_v<F, V>(hex: &str, v: V, mut callback: F) 744 where 745 F: FnMut(&mut CodedInputStream) -> crate::Result<V>, 746 V: PartialEq + Debug, 747 { 748 test_read(hex, |reader| { 749 assert_eq!(v, callback(reader).unwrap()); 750 }); 751 } 752 753 #[test] test_input_stream_read_raw_byte()754 fn test_input_stream_read_raw_byte() { 755 test_read("17", |is| { 756 assert_eq!(23, is.read_raw_byte().unwrap()); 757 }); 758 } 759 760 #[test] test_input_stream_read_raw_varint()761 fn test_input_stream_read_raw_varint() { 762 test_read_v("07", 7, |reader| reader.read_raw_varint32()); 763 test_read_v("07", 7, |reader| reader.read_raw_varint64()); 764 765 test_read_v("96 01", 150, |reader| reader.read_raw_varint32()); 766 test_read_v("96 01", 150, |reader| reader.read_raw_varint64()); 767 768 test_read_v( 769 "ff ff ff ff ff ff ff ff ff 01", 770 0xffffffffffffffff, 771 |reader| reader.read_raw_varint64(), 772 ); 773 774 test_read_v("ff ff ff ff 0f", 0xffffffff, |reader| { 775 reader.read_raw_varint32() 776 }); 777 test_read_v("ff ff ff ff 0f", 0xffffffff, |reader| { 778 reader.read_raw_varint64() 779 }); 780 } 781 782 #[test] test_input_stream_read_raw_varint_out_of_range()783 fn test_input_stream_read_raw_varint_out_of_range() { 784 test_read_partial("ff ff ff ff ff ff ff ff ff 02", |is| { 785 assert!(is.read_raw_varint64().is_err()); 786 }); 787 test_read_partial("ff ff ff ff ff ff ff ff ff 02", |is| { 788 assert!(is.read_raw_varint32().is_err()); 789 }); 790 } 791 792 #[test] test_input_stream_read_raw_varint_too_long()793 fn test_input_stream_read_raw_varint_too_long() { 794 // varint cannot have length > 10 795 test_read_partial("ff ff ff ff ff ff ff ff ff ff 01", |reader| { 796 let error = reader.read_raw_varint64().unwrap_err().0; 797 match *error { 798 ProtobufError::WireError(..) => (), 799 _ => panic!(), 800 } 801 }); 802 test_read_partial("ff ff ff ff ff ff ff ff ff ff 01", |reader| { 803 let error = reader.read_raw_varint32().unwrap_err().0; 804 match *error { 805 ProtobufError::WireError(..) => (), 806 _ => panic!(), 807 } 808 }); 809 } 810 811 #[test] test_input_stream_read_raw_varint_unexpected_eof()812 fn test_input_stream_read_raw_varint_unexpected_eof() { 813 test_read_partial("96 97", |reader| { 814 let error = reader.read_raw_varint32().unwrap_err().0; 815 match *error { 816 ProtobufError::WireError(..) => (), 817 _ => panic!(), 818 } 819 }); 820 } 821 822 #[test] test_input_stream_read_raw_varint_pos()823 fn test_input_stream_read_raw_varint_pos() { 824 test_read_partial("95 01 98", |reader| { 825 assert_eq!(149, reader.read_raw_varint32().unwrap()); 826 assert_eq!(2, reader.pos()); 827 }); 828 } 829 830 #[test] test_input_stream_read_int32()831 fn test_input_stream_read_int32() { 832 test_read_v("02", 2, |reader| reader.read_int32()); 833 } 834 835 #[test] test_input_stream_read_float()836 fn test_input_stream_read_float() { 837 test_read_v("95 73 13 61", 17e19, |is| is.read_float()); 838 } 839 840 #[test] test_input_stream_read_double()841 fn test_input_stream_read_double() { 842 test_read_v("40 d5 ab 68 b3 07 3d 46", 23e29, |is| is.read_double()); 843 } 844 845 #[test] test_input_stream_skip_raw_bytes()846 fn test_input_stream_skip_raw_bytes() { 847 test_read("", |reader| { 848 reader.skip_raw_bytes(0).unwrap(); 849 }); 850 test_read("aa bb", |reader| { 851 reader.skip_raw_bytes(2).unwrap(); 852 }); 853 test_read("aa bb cc dd ee ff", |reader| { 854 reader.skip_raw_bytes(6).unwrap(); 855 }); 856 } 857 858 #[test] test_input_stream_read_raw_bytes()859 fn test_input_stream_read_raw_bytes() { 860 test_read("", |reader| { 861 assert_eq!( 862 Vec::from(&b""[..]), 863 reader.read_raw_bytes(0).expect("read_raw_bytes") 864 ); 865 }) 866 } 867 868 #[test] test_input_stream_limits()869 fn test_input_stream_limits() { 870 test_read("aa bb cc", |is| { 871 let old_limit = is.push_limit(1).unwrap(); 872 assert_eq!(1, is.bytes_until_limit()); 873 let r1 = is.read_raw_bytes(1).unwrap(); 874 assert_eq!(&[0xaa as u8], &r1[..]); 875 is.pop_limit(old_limit); 876 let r2 = is.read_raw_bytes(2).unwrap(); 877 assert_eq!(&[0xbb as u8, 0xcc], &r2[..]); 878 }); 879 } 880 881 #[test] test_input_stream_io_read()882 fn test_input_stream_io_read() { 883 test_read("aa bb cc", |is| { 884 let mut buf = [0; 3]; 885 assert_eq!(Read::read(is, &mut buf).expect("io::Read"), 3); 886 assert_eq!(buf, [0xaa, 0xbb, 0xcc]); 887 }); 888 } 889 890 #[test] test_input_stream_io_bufread()891 fn test_input_stream_io_bufread() { 892 test_read("aa bb cc", |is| { 893 assert_eq!( 894 BufRead::fill_buf(is).expect("io::BufRead::fill_buf"), 895 &[0xaa, 0xbb, 0xcc] 896 ); 897 BufRead::consume(is, 3); 898 }); 899 } 900 901 #[test] 902 #[cfg_attr(miri, ignore)] // Miri is too slow for this test. test_input_stream_read_raw_bytes_into_huge()903 fn test_input_stream_read_raw_bytes_into_huge() { 904 let mut v = Vec::new(); 905 for i in 0..READ_RAW_BYTES_MAX_ALLOC + 1000 { 906 v.push((i % 10) as u8); 907 } 908 909 let mut slice: &[u8] = v.as_slice(); 910 911 let mut is = CodedInputStream::new(&mut slice); 912 913 let mut buf = Vec::new(); 914 915 is.read_raw_bytes_into(READ_RAW_BYTES_MAX_ALLOC as u32 + 10, &mut buf) 916 .expect("read"); 917 918 assert_eq!(READ_RAW_BYTES_MAX_ALLOC + 10, buf.len()); 919 920 buf.clear(); 921 922 is.read_raw_bytes_into(1000 - 10, &mut buf).expect("read"); 923 924 assert_eq!(1000 - 10, buf.len()); 925 926 assert!(is.eof().expect("eof")); 927 } 928 929 // Copy of this test: https://tinyurl.com/34hfavtz 930 #[test] test_skip_group()931 fn test_skip_group() { 932 // Create an output stream with a group in: 933 // Field 1: string "field 1" 934 // Field 2: group containing: 935 // Field 1: fixed int32 value 100 936 // Field 2: string "ignore me" 937 // Field 3: nested group containing 938 // Field 1: fixed int64 value 1000 939 // Field 3: string "field 3" 940 941 let mut vec = Vec::new(); 942 let mut os = CodedOutputStream::new(&mut vec); 943 os.write_tag(1, WireType::LengthDelimited).unwrap(); 944 os.write_string_no_tag("field 1").unwrap(); 945 946 // The outer group... 947 os.write_tag(2, WireType::StartGroup).unwrap(); 948 os.write_tag(1, WireType::Fixed32).unwrap(); 949 os.write_fixed32_no_tag(100).unwrap(); 950 os.write_tag(3, WireType::LengthDelimited).unwrap(); 951 os.write_string_no_tag("ignore me").unwrap(); 952 // The nested group... 953 os.write_tag(3, WireType::StartGroup).unwrap(); 954 os.write_tag(1, WireType::Fixed64).unwrap(); 955 os.write_fixed64_no_tag(1000).unwrap(); 956 // Note: Not sure the field number is relevant for end group... 957 os.write_tag(3, WireType::EndGroup).unwrap(); 958 959 // End the outer group 960 os.write_tag(2, WireType::EndGroup).unwrap(); 961 962 os.write_tag(3, WireType::LengthDelimited).unwrap(); 963 os.write_string_no_tag("field 3").unwrap(); 964 os.flush().unwrap(); 965 drop(os); 966 967 let mut input = CodedInputStream::from_bytes(&vec); 968 // Now act like a generated client 969 assert_eq!( 970 Tag::make(1, WireType::LengthDelimited), 971 input.read_tag().unwrap() 972 ); 973 assert_eq!("field 1", &input.read_string().unwrap()); 974 assert_eq!( 975 Tag::make(2, WireType::StartGroup), 976 input.read_tag().unwrap() 977 ); 978 input.skip_field(WireType::StartGroup).unwrap(); 979 assert_eq!( 980 Tag::make(3, WireType::LengthDelimited), 981 input.read_tag().unwrap() 982 ); 983 assert_eq!("field 3", input.read_string().unwrap()); 984 } 985 } 986