Initial commit

pull/200/head
Chip Senkbeil 1 year ago
parent 6ba3ded188
commit 808327eaff
No known key found for this signature in database
GPG Key ID: 35EF1F8EC72A4131

1
Cargo.lock generated

@ -935,6 +935,7 @@ dependencies = [
"p256",
"paste",
"rand",
"rmp",
"rmp-serde",
"serde",
"serde_bytes",

@ -25,10 +25,12 @@ log = "0.4.18"
paste = "1.0.12"
p256 = { version = "0.13.2", features = ["ecdh", "pem"] }
rand = { version = "0.8.5", features = ["getrandom"] }
rmp = "0.8.11"
rmp-serde = "1.1.1"
sha2 = "0.10.6"
serde = { version = "1.0.163", features = ["derive"] }
serde_bytes = "0.11.9"
serde_json = "1.0.96"
strum = { version = "0.24.1", features = ["derive"] }
tokio = { version = "1.28.2", features = ["full"] }

@ -20,4 +20,5 @@ pub use listener::*;
pub use map::*;
pub use packet::*;
pub use port::*;
pub use serde_json::Value;
pub use transport::*;

@ -1,628 +1,254 @@
/// Represents a generic id type
pub type Id = String;
/// Represents a packet header for a request or response
pub type Header = std::collections::HashMap<String, crate::common::Value>;
/// Generates a new [`Header`] of key/value pairs based on literals.
///
/// ```
/// use distant_net::header;
///
/// let _header = header!("key" -> "value", "key2" -> 123);
/// ```
#[macro_export]
macro_rules! header {
($($key:literal -> $value:expr),* $(,)?) => {{
let mut _header = ::std::collections::HashMap::new();
$(
_header.insert($key.to_string(), $crate::common::Value::from($value));
)*
_header
}};
}
mod request;
mod response;
pub use request::*;
pub use response::*;
#[derive(Clone, Debug, PartialEq, Eq)]
enum MsgPackStrParseError {
InvalidFormat,
Utf8Error(std::str::Utf8Error),
}
use std::io::Cursor;
/// Writes the given str to the end of `buf` as the str's msgpack representation.
///
/// # Panics
/// Reads the header bytes from msgpack input, including the marker and len bytes.
///
/// Panics if `s.len() >= 2 ^ 32` as the maximum str length for a msgpack str is `(2 ^ 32) - 1`.
fn write_str_msg_pack(s: &str, buf: &mut Vec<u8>) {
assert!(
s.len() < 2usize.pow(32),
"str cannot be longer than (2^32)-1 bytes"
);
if s.len() < 32 {
buf.push(s.len() as u8 | 0b10100000);
} else if s.len() < 2usize.pow(8) {
buf.push(0xd9);
buf.push(s.len() as u8);
} else if s.len() < 2usize.pow(16) {
buf.push(0xda);
for b in (s.len() as u16).to_be_bytes() {
buf.push(b);
}
} else {
buf.push(0xdb);
for b in (s.len() as u32).to_be_bytes() {
buf.push(b);
/// * If succeeds, returns (header, remaining).
/// * If fails, returns existing bytes.
fn read_header_bytes(input: &[u8]) -> Result<(&[u8], &[u8]), &[u8]> {
let mut cursor = Cursor::new(input);
// Determine size of header map in terms of total objects
let len = match rmp::decode::read_map_len(&mut cursor) {
Ok(x) => x,
Err(_) => return Err(input),
};
// For each object, we have a corresponding key in front of it has a string,
// so we need to iterate, advancing by a string key and then the object
for _i in 0..len {
// Read just the length of the key to avoid copying the key itself
let key_len = match rmp::decode::read_str_len(&mut cursor) {
Ok(x) => x as u64,
Err(_) => return Err(input),
};
// Advance forward past the key
cursor.set_position(cursor.position() + key_len);
// Point locally to just past the str key so we can determine next byte len to skip
let input = &input[cursor.position() as usize..];
// Read the type of object and advance accordingly
match find_msgpack_byte_len(input) {
Some(len) => cursor.set_position(cursor.position() + len),
None => return Err(input),
}
}
buf.extend_from_slice(s.as_bytes());
let pos = cursor.position() as usize;
Ok((&input[..pos], &input[pos..]))
}
/// Parse msgpack str, returning remaining bytes and str on success, or error on failure.
fn parse_msg_pack_str(input: &[u8]) -> Result<(&[u8], &str), MsgPackStrParseError> {
let ilen = input.len();
if ilen == 0 {
return Err(MsgPackStrParseError::InvalidFormat);
/// Determines the length of the next object based on its marker. From the marker, some objects
/// need to be traversed (e.g. map) in order to fully understand the total byte length.
///
/// This will include the marker bytes in the total byte len such that collecting all of the
/// bytes up to len will yield a valid msgpack object in byte form.
///
/// If the first byte does not signify a valid marker, this method returns None.
fn find_msgpack_byte_len(input: &[u8]) -> Option<u64> {
if input.is_empty() {
return None;
}
// * fixstr using 0xa0 - 0xbf to mark the start of the str where < 32 bytes
// * str 8 (0xd9) if up to (2^8)-1 bytes, using next byte for len
// * str 16 (0xda) if up to (2^16)-1 bytes, using next two bytes for len
// * str 32 (0xdb) if up to (2^32)-1 bytes, using next four bytes for len
let (input, len): (&[u8], usize) = if input[0] >= 0xa0 && input[0] <= 0xbf {
(&input[1..], (input[0] & 0b00011111).into())
} else if input[0] == 0xd9 && ilen > 2 {
(&input[2..], input[1].into())
} else if input[0] == 0xda && ilen > 3 {
(&input[3..], u16::from_be_bytes([input[1], input[2]]).into())
} else if input[0] == 0xdb && ilen > 5 {
(
&input[5..],
u32::from_be_bytes([input[1], input[2], input[3], input[4]])
.try_into()
.unwrap(),
)
} else {
return Err(MsgPackStrParseError::InvalidFormat);
};
let s = match std::str::from_utf8(&input[..len]) {
Ok(s) => s,
Err(x) => return Err(MsgPackStrParseError::Utf8Error(x)),
};
Ok((&input[len..], s))
}
#[cfg(test)]
mod tests {
use super::*;
mod write_str_msg_pack {
use super::*;
#[test]
fn should_support_fixstr() {
// 0-byte str
let mut buf = Vec::new();
write_str_msg_pack("", &mut buf);
assert_eq!(buf, &[0xa0]);
// 1-byte str
let mut buf = Vec::new();
write_str_msg_pack("a", &mut buf);
assert_eq!(buf, &[0xa1, b'a']);
// 2-byte str
let mut buf = Vec::new();
write_str_msg_pack("ab", &mut buf);
assert_eq!(buf, &[0xa2, b'a', b'b']);
// 3-byte str
let mut buf = Vec::new();
write_str_msg_pack("abc", &mut buf);
assert_eq!(buf, &[0xa3, b'a', b'b', b'c']);
// 4-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcd", &mut buf);
assert_eq!(buf, &[0xa4, b'a', b'b', b'c', b'd']);
// 5-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcde", &mut buf);
assert_eq!(buf, &[0xa5, b'a', b'b', b'c', b'd', b'e']);
// 6-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdef", &mut buf);
assert_eq!(buf, &[0xa6, b'a', b'b', b'c', b'd', b'e', b'f']);
// 7-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefg", &mut buf);
assert_eq!(buf, &[0xa7, b'a', b'b', b'c', b'd', b'e', b'f', b'g']);
// 8-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefgh", &mut buf);
assert_eq!(buf, &[0xa8, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h']);
// 9-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghi", &mut buf);
assert_eq!(
buf,
&[0xa9, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i']
);
// 10-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghij", &mut buf);
assert_eq!(
buf,
&[0xaa, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j']
);
// 11-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijk", &mut buf);
assert_eq!(
buf,
&[0xab, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k']
);
// 12-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijkl", &mut buf);
assert_eq!(
buf,
&[0xac, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l']
);
// 13-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklm", &mut buf);
assert_eq!(
buf,
&[
0xad, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm'
]
);
// 14-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmn", &mut buf);
assert_eq!(
buf,
&[
0xae, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n'
]
);
// 15-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmno", &mut buf);
assert_eq!(
buf,
&[
0xaf, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o'
]
);
// 16-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnop", &mut buf);
assert_eq!(
buf,
&[
0xb0, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p'
]
);
// 17-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopq", &mut buf);
assert_eq!(
buf,
&[
0xb1, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q'
]
);
// 18-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqr", &mut buf);
assert_eq!(
buf,
&[
0xb2, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r'
]
);
// 19-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrs", &mut buf);
assert_eq!(
buf,
&[
0xb3, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's'
]
);
// 20-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrst", &mut buf);
assert_eq!(
buf,
&[
0xb4, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't'
]
);
// 21-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstu", &mut buf);
assert_eq!(
buf,
&[
0xb5, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u'
]
);
// 22-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuv", &mut buf);
assert_eq!(
buf,
&[
0xb6, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v'
]
);
// 23-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvw", &mut buf);
assert_eq!(
buf,
&[
0xb7, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w'
]
);
// 24-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwx", &mut buf);
assert_eq!(
buf,
&[
0xb8, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x'
]
);
// 25-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxy", &mut buf);
assert_eq!(
buf,
&[
0xb9, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y'
]
);
// 26-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz", &mut buf);
assert_eq!(
buf,
&[
0xba, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z'
]
);
// 27-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz0", &mut buf);
assert_eq!(
buf,
&[
0xbb, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z', b'0'
]
);
// 28-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz01", &mut buf);
assert_eq!(
buf,
&[
0xbc, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z', b'0', b'1'
]
);
// 29-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz012", &mut buf);
assert_eq!(
buf,
&[
0xbd, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z', b'0', b'1', b'2'
]
);
// 30-byte str
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz0123", &mut buf);
assert_eq!(
buf,
&[
0xbe, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z', b'0', b'1', b'2', b'3'
]
);
// 31-byte str is maximum len of fixstr
let mut buf = Vec::new();
write_str_msg_pack("abcdefghijklmnopqrstuvwxyz01234", &mut buf);
assert_eq!(
buf,
&[
0xbf, b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l',
b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y',
b'z', b'0', b'1', b'2', b'3', b'4'
]
);
}
#[test]
fn should_support_str_8() {
let input = "a".repeat(32);
let mut buf = Vec::new();
write_str_msg_pack(&input, &mut buf);
assert_eq!(buf[0], 0xd9);
assert_eq!(buf[1], input.len() as u8);
assert_eq!(&buf[2..], input.as_bytes());
let input = "a".repeat(2usize.pow(8) - 1);
let mut buf = Vec::new();
write_str_msg_pack(&input, &mut buf);
assert_eq!(buf[0], 0xd9);
assert_eq!(buf[1], input.len() as u8);
assert_eq!(&buf[2..], input.as_bytes());
}
#[test]
fn should_support_str_16() {
let input = "a".repeat(2usize.pow(8));
let mut buf = Vec::new();
write_str_msg_pack(&input, &mut buf);
assert_eq!(buf[0], 0xda);
assert_eq!(&buf[1..3], &(input.len() as u16).to_be_bytes());
assert_eq!(&buf[3..], input.as_bytes());
let input = "a".repeat(2usize.pow(16) - 1);
let mut buf = Vec::new();
write_str_msg_pack(&input, &mut buf);
assert_eq!(buf[0], 0xda);
assert_eq!(&buf[1..3], &(input.len() as u16).to_be_bytes());
assert_eq!(&buf[3..], input.as_bytes());
}
#[test]
fn should_support_str_32() {
let input = "a".repeat(2usize.pow(16));
let mut buf = Vec::new();
write_str_msg_pack(&input, &mut buf);
assert_eq!(buf[0], 0xdb);
assert_eq!(&buf[1..5], &(input.len() as u32).to_be_bytes());
assert_eq!(&buf[5..], input.as_bytes());
}
macro_rules! read_len {
(u8: $input:expr $(, start = $start:expr)?) => {{
let input = $input;
$(
if input.len() < $start {
return None;
}
let input = &input[$start..];
)?
if input.is_empty() {
return None;
} else {
input[0] as u64
}
}};
(u16: $input:expr $(, start = $start:expr)?) => {{
let input = $input;
$(
if input.len() < $start {
return None;
}
let input = &input[$start..];
)?
if input.len() < 2 {
return None;
} else {
u16::from_be_bytes([input[0], input[1]]) as u64
}
}};
(u32: $input:expr $(, start = $start:expr)?) => {{
let input = $input;
$(
if input.len() < $start {
return None;
}
let input = &input[$start..];
)?
if input.len() < 4 {
return None;
} else {
u32::from_be_bytes([input[0], input[1], input[2], input[3]]) as u64
}
}};
($cnt:expr => $input:expr $(, start = $start:expr)?) => {{
let input = $input;
$(
if input.len() < $start {
return None;
}
let input = &input[$start..];
)?
let cnt = $cnt;
let mut len = 0;
for _i in 0..cnt {
if input.len() < len {
return None;
}
let input = &input[len..];
match find_msgpack_byte_len(input) {
Some(x) => len += x as usize,
None => return None,
}
}
len as u64
}};
}
mod parse_msg_pack_str {
use super::*;
#[test]
fn should_be_able_to_parse_fixstr() {
// Empty str
let (input, s) = parse_msg_pack_str(&[0xa0]).unwrap();
assert!(input.is_empty());
assert_eq!(s, "");
// Single character
let (input, s) = parse_msg_pack_str(&[0xa1, b'a']).unwrap();
assert!(input.is_empty());
assert_eq!(s, "a");
// 31 byte str
let (input, s) = parse_msg_pack_str(&[
0xbf, b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a',
])
.unwrap();
assert!(input.is_empty());
assert_eq!(s, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
// Verify that we only consume up to fixstr length
assert_eq!(parse_msg_pack_str(&[0xa0, b'a']).unwrap().0, b"a");
assert_eq!(
parse_msg_pack_str(&[
0xbf, b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a', b'a', b'a', b'b'
])
.unwrap()
.0,
b"b"
);
}
#[test]
fn should_be_able_to_parse_str_8() {
// 32 byte str
let (input, s) = parse_msg_pack_str(&[
0xd9, 32, b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a', b'a',
b'a', b'a', b'a', b'a', b'a', b'a',
])
.unwrap();
assert!(input.is_empty());
assert_eq!(s, "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
// 2^8 - 1 (255) byte str
let test_str = "a".repeat(2usize.pow(8) - 1);
let mut input = vec![0xd9, 255];
input.extend_from_slice(test_str.as_bytes());
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert!(input.is_empty());
assert_eq!(s, test_str);
// Verify that we only consume up to 2^8 - 1 length
let mut input = vec![0xd9, 255];
input.extend_from_slice(test_str.as_bytes());
input.extend_from_slice(b"hello");
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert_eq!(input, b"hello");
assert_eq!(s, test_str);
Some(match rmp::Marker::from_u8(input[0]) {
// Booleans and nil (aka null) are a combination of marker and value (single byte)
rmp::Marker::Null => 1,
rmp::Marker::True => 1,
rmp::Marker::False => 1,
// Integers are stored in 1, 2, 3, 5, or 9 bytes
rmp::Marker::FixPos(_) => 1,
rmp::Marker::FixNeg(_) => 1,
rmp::Marker::U8 => 2,
rmp::Marker::U16 => 3,
rmp::Marker::U32 => 5,
rmp::Marker::U64 => 9,
rmp::Marker::I8 => 2,
rmp::Marker::I16 => 3,
rmp::Marker::I32 => 5,
rmp::Marker::I64 => 9,
// Floats are stored in 5 or 9 bytes
rmp::Marker::F32 => 5,
rmp::Marker::F64 => 9,
// Str are stored in 1, 2, 3, or 5 bytes + the data buffer
rmp::Marker::FixStr(len) => 1 + len as u64,
rmp::Marker::Str8 => 2 + read_len!(u8: input, start = 1),
rmp::Marker::Str16 => 3 + read_len!(u16: input, start = 1),
rmp::Marker::Str32 => 5 + read_len!(u32: input, start = 1),
// Bin are stored in 2, 3, or 5 bytes + the data buffer
rmp::Marker::Bin8 => 2 + read_len!(u8: input, start = 1),
rmp::Marker::Bin16 => 3 + read_len!(u16: input, start = 1),
rmp::Marker::Bin32 => 5 + read_len!(u32: input, start = 1),
// Arrays are stored in 1, 3, or 5 bytes + N objects (where each object has its own len)
rmp::Marker::FixArray(cnt) => 1 + read_len!(cnt => input, start = 1),
rmp::Marker::Array16 => {
let cnt = read_len!(u16: input, start = 1);
3 + read_len!(cnt => input, start = 3)
}
#[test]
fn should_be_able_to_parse_str_16() {
// 2^8 byte str (256)
let test_str = "a".repeat(2usize.pow(8));
let mut input = vec![0xda, 1, 0];
input.extend_from_slice(test_str.as_bytes());
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert!(input.is_empty());
assert_eq!(s, test_str);
// 2^16 - 1 (65535) byte str
let test_str = "a".repeat(2usize.pow(16) - 1);
let mut input = vec![0xda, 255, 255];
input.extend_from_slice(test_str.as_bytes());
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert!(input.is_empty());
assert_eq!(s, test_str);
// Verify that we only consume up to 2^16 - 1 length
let mut input = vec![0xda, 255, 255];
input.extend_from_slice(test_str.as_bytes());
input.extend_from_slice(b"hello");
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert_eq!(input, b"hello");
assert_eq!(s, test_str);
rmp::Marker::Array32 => {
let cnt = read_len!(u32: input, start = 1);
5 + read_len!(cnt => input, start = 5)
}
#[test]
fn should_be_able_to_parse_str_32() {
// 2^16 byte str
let test_str = "a".repeat(2usize.pow(16));
let mut input = vec![0xdb, 0, 1, 0, 0];
input.extend_from_slice(test_str.as_bytes());
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert!(input.is_empty());
assert_eq!(s, test_str);
// NOTE: We are not going to run the below tests, not because they aren't valid but
// because this generates a 4GB str which takes 20+ seconds to run
// 2^32 - 1 byte str (4294967295 bytes)
/* let test_str = "a".repeat(2usize.pow(32) - 1);
let mut input = vec![0xdb, 255, 255, 255, 255];
input.extend_from_slice(test_str.as_bytes());
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert!(input.is_empty());
assert_eq!(s, test_str); */
// Verify that we only consume up to 2^32 - 1 length
/* let mut input = vec![0xdb, 255, 255, 255, 255];
input.extend_from_slice(test_str.as_bytes());
input.extend_from_slice(b"hello");
let (input, s) = parse_msg_pack_str(&input).unwrap();
assert_eq!(input, b"hello");
assert_eq!(s, test_str); */
// Maps are stored in 1, 3, or 5 bytes + 2*N objects (where each object has its own len)
rmp::Marker::FixMap(cnt) => 1 + read_len!(2 * cnt => input, start = 1),
rmp::Marker::Map16 => {
let cnt = read_len!(u16: input, start = 1);
3 + read_len!(2 * cnt => input, start = 3)
}
#[test]
fn should_fail_parsing_str_with_invalid_length() {
// Make sure that parse doesn't fail looking for bytes after str 8 len
assert_eq!(
parse_msg_pack_str(&[0xd9]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xd9, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
// Make sure that parse doesn't fail looking for bytes after str 16 len
assert_eq!(
parse_msg_pack_str(&[0xda]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xda, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xda, 0, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
// Make sure that parse doesn't fail looking for bytes after str 32 len
assert_eq!(
parse_msg_pack_str(&[0xdb]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xdb, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xdb, 0, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xdb, 0, 0, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
assert_eq!(
parse_msg_pack_str(&[0xdb, 0, 0, 0, 0]),
Err(MsgPackStrParseError::InvalidFormat)
);
rmp::Marker::Map32 => {
let cnt = read_len!(u32: input, start = 1);
5 + read_len!(2 * cnt => input, start = 5)
}
#[test]
fn should_fail_parsing_other_types() {
assert_eq!(
parse_msg_pack_str(&[0xc3]), // Boolean (true)
Err(MsgPackStrParseError::InvalidFormat)
);
}
// Ext are stored in an integer (8-bit, 16-bit, 32-bit), type (8-bit), and byte array
rmp::Marker::FixExt1 => 3,
rmp::Marker::FixExt2 => 4,
rmp::Marker::FixExt4 => 6,
rmp::Marker::FixExt8 => 10,
rmp::Marker::FixExt16 => 18,
rmp::Marker::Ext8 => 3 + read_len!(u8: input, start = 1),
rmp::Marker::Ext16 => 4 + read_len!(u16: input, start = 1),
rmp::Marker::Ext32 => 6 + read_len!(u32: input, start = 1),
// NOTE: This is marked in the msgpack spec as never being used, so we return none
// as this is signfies something has gone wrong!
rmp::Marker::Reserved => return None,
})
}
#[test]
fn should_fail_if_empty_input() {
assert_eq!(
parse_msg_pack_str(&[]),
Err(MsgPackStrParseError::InvalidFormat)
);
}
/// Reads the str bytes from msgpack input, including the marker and len bytes.
///
/// * If succeeds, returns (str, remaining).
/// * If fails, returns existing bytes.
fn read_str_bytes(input: &[u8]) -> Result<(&str, &[u8]), &[u8]> {
match rmp::decode::read_str_from_slice(input) {
Ok(x) => Ok(x),
Err(_) => Err(input),
}
}
#[test]
fn should_fail_if_str_is_not_utf8() {
assert!(matches!(
parse_msg_pack_str(&[0xa4, 0, 159, 146, 150]),
Err(MsgPackStrParseError::Utf8Error(_))
));
}
/// Reads a str key from msgpack input and checks if it matches `key`. If so, the input is
/// advanced, otherwise the original input is returned.
///
/// * If key read successfully and matches, returns (unit, remaining).
/// * Otherwise, returns existing bytes.
fn read_key_eq<'a>(input: &'a [u8], key: &str) -> Result<((), &'a [u8]), &'a [u8]> {
match read_str_bytes(input) {
Ok((s, input)) if s == key => Ok(((), input)),
_ => Err(input),
}
}

@ -5,12 +5,16 @@ use derive_more::{Display, Error};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use super::{parse_msg_pack_str, write_str_msg_pack, Id};
use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id};
use crate::common::utils;
/// Represents a request to send
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request<T> {
/// Optional header data to include with request
#[serde(default, flatten, skip_serializing_if = "Header::is_empty")]
pub header: Header,
/// Unique id associated with the request
pub id: Id,
@ -19,9 +23,10 @@ pub struct Request<T> {
}
impl<T> Request<T> {
/// Creates a new request with a random, unique id
/// Creates a new request with a random, unique id and no header data
pub fn new(payload: T) -> Self {
Self {
header: header!(),
id: rand::random::<u64>().to_string(),
payload,
}
@ -45,6 +50,11 @@ where
/// Attempts to convert a typed request to an untyped request
pub fn to_untyped_request(&self) -> io::Result<UntypedRequest> {
Ok(UntypedRequest {
header: Cow::Owned(if !self.header.is_empty() {
utils::serialize_to_vec(&self.header)?
} else {
Vec::new()
}),
id: Cow::Borrowed(&self.id),
payload: Cow::Owned(self.to_payload_vec()?),
})
@ -73,13 +83,34 @@ pub enum UntypedRequestParseError {
/// When the bytes do not represent a request
WrongType,
/// When a header should be present, but the key is wrong
InvalidHeaderKey,
/// When a header should be present, but the header bytes are wrong
InvalidHeader,
/// When the key for the id is wrong
InvalidIdKey,
/// When the id is not a valid UTF-8 string
InvalidId,
/// When the key for the payload is wrong
InvalidPayloadKey,
}
#[inline]
fn header_is_empty(header: &[u8]) -> bool {
header.is_empty()
}
/// Represents a request to send whose payload is bytes instead of a specific type
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct UntypedRequest<'a> {
/// Header data associated with the request as bytes
#[serde(default, skip_serializing_if = "header_is_empty")]
pub header: Cow<'a, [u8]>,
/// Unique id associated with the request
pub id: Cow<'a, str>,
@ -91,6 +122,11 @@ impl<'a> UntypedRequest<'a> {
/// Attempts to convert an untyped request to a typed request
pub fn to_typed_request<T: DeserializeOwned>(&self) -> io::Result<Request<T>> {
Ok(Request {
header: if header_is_empty(&self.header) {
header!()
} else {
utils::deserialize_from_slice(&self.header)?
},
id: self.id.to_string(),
payload: utils::deserialize_from_slice(&self.payload)?,
})
@ -99,6 +135,10 @@ impl<'a> UntypedRequest<'a> {
/// Convert into a borrowed version
pub fn as_borrowed(&self) -> UntypedRequest<'_> {
UntypedRequest {
header: match &self.header {
Cow::Borrowed(x) => Cow::Borrowed(x),
Cow::Owned(x) => Cow::Borrowed(x.as_slice()),
},
id: match &self.id {
Cow::Borrowed(x) => Cow::Borrowed(x),
Cow::Owned(x) => Cow::Borrowed(x.as_str()),
@ -113,6 +153,10 @@ impl<'a> UntypedRequest<'a> {
/// Convert into an owned version
pub fn into_owned(self) -> UntypedRequest<'static> {
UntypedRequest {
header: match self.header {
Cow::Borrowed(x) => Cow::Owned(x.to_vec()),
Cow::Owned(x) => Cow::Owned(x),
},
id: match self.id {
Cow::Borrowed(x) => Cow::Owned(x.to_string()),
Cow::Owned(x) => Cow::Owned(x),
@ -124,6 +168,11 @@ impl<'a> UntypedRequest<'a> {
}
}
/// Updates the header of the request to the given `header`.
pub fn set_header(&mut self, header: impl IntoIterator<Item = u8>) {
self.header = Cow::Owned(header.into_iter().collect());
}
/// Updates the id of the request to the given `id`.
pub fn set_id(&mut self, id: impl Into<String>) {
self.id = Cow::Owned(id.into());
@ -131,61 +180,80 @@ impl<'a> UntypedRequest<'a> {
/// Allocates a new collection of bytes representing the request.
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = vec![0x82];
let mut bytes = vec![];
let has_header = !header_is_empty(&self.header);
if has_header {
rmp::encode::write_map_len(&mut bytes, 3).unwrap();
} else {
rmp::encode::write_map_len(&mut bytes, 2).unwrap();
}
if has_header {
rmp::encode::write_str(&mut bytes, "header").unwrap();
bytes.extend_from_slice(&self.header);
}
write_str_msg_pack("id", &mut bytes);
write_str_msg_pack(&self.id, &mut bytes);
rmp::encode::write_str(&mut bytes, "id").unwrap();
rmp::encode::write_str(&mut bytes, &self.id).unwrap();
write_str_msg_pack("payload", &mut bytes);
rmp::encode::write_str(&mut bytes, "payload").unwrap();
bytes.extend_from_slice(&self.payload);
bytes
}
/// Parses a collection of bytes, returning a partial request if it can be potentially
/// represented as a [`Request`] depending on the payload, or the original bytes if it does not
/// represent a [`Request`]
/// represented as a [`Request`] depending on the payload.
///
/// NOTE: This supports parsing an invalid request where the payload would not properly
/// deserialize, but the bytes themselves represent a complete request of some kind.
pub fn from_slice(input: &'a [u8]) -> Result<Self, UntypedRequestParseError> {
if input.len() < 2 {
if input.is_empty() {
return Err(UntypedRequestParseError::WrongType);
}
// MsgPack marks a fixmap using 0x80 - 0x8f to indicate the size (up to 15 elements).
//
// In the case of the request, there are only two elements: id and payload. So the first
// byte should ALWAYS be 0x82 (130).
if input[0] != 0x82 {
return Err(UntypedRequestParseError::WrongType);
}
let has_header = match rmp::Marker::from_u8(input[0]) {
rmp::Marker::FixMap(2) => false,
rmp::Marker::FixMap(3) => true,
_ => return Err(UntypedRequestParseError::WrongType),
};
// Skip the first byte representing the fixmap
// Advance position by marker
let input = &input[1..];
// Validate that first field is id
let (input, id_key) =
parse_msg_pack_str(input).map_err(|_| UntypedRequestParseError::WrongType)?;
if id_key != "id" {
return Err(UntypedRequestParseError::WrongType);
}
// Parse the header if we have one
let (header, input) = if has_header {
let (_, input) = read_key_eq(input, "header")
.map_err(|_| UntypedRequestParseError::InvalidHeaderKey)?;
let (header, input) =
read_header_bytes(input).map_err(|_| UntypedRequestParseError::InvalidHeader)?;
(header, input)
} else {
([0u8; 0].as_slice(), input)
};
// Validate that next field is id
let (_, input) =
read_key_eq(input, "id").map_err(|_| UntypedRequestParseError::InvalidIdKey)?;
// Get the id itself
let (input, id) =
parse_msg_pack_str(input).map_err(|_| UntypedRequestParseError::InvalidId)?;
let (id, input) = read_str_bytes(input).map_err(|_| UntypedRequestParseError::InvalidId)?;
// Validate that second field is payload
let (input, payload_key) =
parse_msg_pack_str(input).map_err(|_| UntypedRequestParseError::WrongType)?;
if payload_key != "payload" {
return Err(UntypedRequestParseError::WrongType);
}
// Validate that final field is payload
let (_, input) = read_key_eq(input, "payload")
.map_err(|_| UntypedRequestParseError::InvalidPayloadKey)?;
let header = Cow::Borrowed(header);
let id = Cow::Borrowed(id);
let payload = Cow::Borrowed(input);
Ok(Self { id, payload })
Ok(Self {
header,
id,
payload,
})
}
}
@ -204,12 +272,13 @@ mod tests {
// fixstr of 7 bytes with str "payload"
const PAYLOAD_FIELD_BYTES: &[u8] = &[0xa7, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64];
/// fixstr of 4 bytes with str "test"
// fixstr of 4 bytes with str "test"
const TEST_STR_BYTES: &[u8] = &[0xa4, 0x74, 0x65, 0x73, 0x74];
#[test]
fn untyped_request_should_support_converting_to_bytes() {
let bytes = Request {
header: header!(),
id: "some id".to_string(),
payload: true,
}
@ -220,9 +289,44 @@ mod tests {
assert_eq!(untyped_request.to_bytes(), bytes);
}
#[test]
fn untyped_request_should_support_converting_to_bytes_with_header() {
let bytes = Request {
header: header!("key" -> 123),
id: "some id".to_string(),
payload: true,
}
.to_vec()
.unwrap();
let untyped_request = UntypedRequest::from_slice(&bytes).unwrap();
assert_eq!(untyped_request.to_bytes(), bytes);
}
#[test]
fn untyped_request_should_support_parsing_from_request_bytes_with_header() {
let bytes = Request {
header: header!("key" -> 123),
id: "some id".to_string(),
payload: true,
}
.to_vec()
.unwrap();
assert_eq!(
UntypedRequest::from_slice(&bytes),
Ok(UntypedRequest {
header: Cow::Owned(utils::serialize_to_vec(&header!("key" -> 123)).unwrap()),
id: Cow::Borrowed("some id"),
payload: Cow::Owned(vec![TRUE_BYTE]),
})
);
}
#[test]
fn untyped_request_should_support_parsing_from_request_bytes_with_valid_payload() {
let bytes = Request {
header: header!(),
id: "some id".to_string(),
payload: true,
}
@ -232,6 +336,7 @@ mod tests {
assert_eq!(
UntypedRequest::from_slice(&bytes),
Ok(UntypedRequest {
header: Cow::Owned(vec![]),
id: Cow::Borrowed("some id"),
payload: Cow::Owned(vec![TRUE_BYTE]),
})
@ -242,6 +347,7 @@ mod tests {
fn untyped_request_should_support_parsing_from_request_bytes_with_invalid_payload() {
// Request with id < 32 bytes
let mut bytes = Request {
header: header!(),
id: "".to_string(),
payload: true,
}
@ -255,6 +361,7 @@ mod tests {
assert_eq!(
UntypedRequest::from_slice(&bytes),
Ok(UntypedRequest {
header: Cow::Owned(vec![]),
id: Cow::Owned("".to_string()),
payload: Cow::Owned(vec![TRUE_BYTE, NEVER_USED_BYTE]),
})
@ -284,7 +391,7 @@ mod tests {
// Missing fields (corrupt data)
assert_eq!(
UntypedRequest::from_slice(&[0x82]),
Err(UntypedRequestParseError::WrongType)
Err(UntypedRequestParseError::InvalidIdKey)
);
// Missing id field (has valid data itself)
@ -300,7 +407,7 @@ mod tests {
.concat()
.as_slice()
),
Err(UntypedRequestParseError::WrongType)
Err(UntypedRequestParseError::InvalidIdKey)
);
// Non-str id field value
@ -348,7 +455,7 @@ mod tests {
.concat()
.as_slice()
),
Err(UntypedRequestParseError::WrongType)
Err(UntypedRequestParseError::InvalidPayloadKey)
);
}
}

@ -5,12 +5,16 @@ use derive_more::{Display, Error};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use super::{parse_msg_pack_str, write_str_msg_pack, Id};
use super::{read_header_bytes, read_key_eq, read_str_bytes, Header, Id};
use crate::common::utils;
/// Represents a response received related to some response
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Response<T> {
/// Optional header data to include with response
#[serde(default, flatten, skip_serializing_if = "Header::is_empty")]
pub header: Header,
/// Unique id associated with the response
pub id: Id,
@ -22,9 +26,10 @@ pub struct Response<T> {
}
impl<T> Response<T> {
/// Creates a new response with a random, unique id
/// Creates a new response with a random, unique id and no header data
pub fn new(origin_id: Id, payload: T) -> Self {
Self {
header: header!(),
id: rand::random::<u64>().to_string(),
origin_id,
payload,
@ -49,6 +54,11 @@ where
/// Attempts to convert a typed response to an untyped response
pub fn to_untyped_response(&self) -> io::Result<UntypedResponse> {
Ok(UntypedResponse {
header: Cow::Owned(if !self.header.is_empty() {
utils::serialize_to_vec(&self.header)?
} else {
Vec::new()
}),
id: Cow::Borrowed(&self.id),
origin_id: Cow::Borrowed(&self.origin_id),
payload: Cow::Owned(self.to_payload_vec()?),
@ -79,9 +89,18 @@ pub enum UntypedResponseParseError {
InvalidOriginId,
}
#[inline]
fn header_is_empty(header: &[u8]) -> bool {
header.is_empty()
}
/// Represents a response to send whose payload is bytes instead of a specific type
#[derive(Clone, Debug, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
pub struct UntypedResponse<'a> {
/// Header data associated with the response as bytes
#[serde(default, skip_serializing_if = "header_is_empty")]
pub header: Cow<'a, [u8]>,
/// Unique id associated with the response
pub id: Cow<'a, str>,
@ -93,9 +112,14 @@ pub struct UntypedResponse<'a> {
}
impl<'a> UntypedResponse<'a> {
/// Attempts to convert an untyped request to a typed request
/// Attempts to convert an untyped response to a typed response
pub fn to_typed_response<T: DeserializeOwned>(&self) -> io::Result<Response<T>> {
Ok(Response {
header: if header_is_empty(&self.header) {
header!()
} else {
utils::deserialize_from_slice(&self.header)?
},
id: self.id.to_string(),
origin_id: self.origin_id.to_string(),
payload: utils::deserialize_from_slice(&self.payload)?,
@ -105,6 +129,10 @@ impl<'a> UntypedResponse<'a> {
/// Convert into a borrowed version
pub fn as_borrowed(&self) -> UntypedResponse<'_> {
UntypedResponse {
header: match &self.header {
Cow::Borrowed(x) => Cow::Borrowed(x),
Cow::Owned(x) => Cow::Borrowed(x.as_slice()),
},
id: match &self.id {
Cow::Borrowed(x) => Cow::Borrowed(x),
Cow::Owned(x) => Cow::Borrowed(x.as_str()),
@ -123,6 +151,10 @@ impl<'a> UntypedResponse<'a> {
/// Convert into an owned version
pub fn into_owned(self) -> UntypedResponse<'static> {
UntypedResponse {
header: match self.header {
Cow::Borrowed(x) => Cow::Owned(x.to_vec()),
Cow::Owned(x) => Cow::Owned(x),
},
id: match self.id {
Cow::Borrowed(x) => Cow::Owned(x.to_string()),
Cow::Owned(x) => Cow::Owned(x),
@ -138,6 +170,11 @@ impl<'a> UntypedResponse<'a> {
}
}
/// Updates the header of the response to the given `header`.
pub fn set_header(&mut self, header: impl IntoIterator<Item = u8>) {
self.header = Cow::Owned(header.into_iter().collect());
}
/// Updates the id of the response to the given `id`.
pub fn set_id(&mut self, id: impl Into<String>) {
self.id = Cow::Owned(id.into());
@ -150,76 +187,90 @@ impl<'a> UntypedResponse<'a> {
/// Allocates a new collection of bytes representing the response.
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = vec![0x83];
let mut bytes = vec![];
let has_header = !header_is_empty(&self.header);
if has_header {
rmp::encode::write_map_len(&mut bytes, 4).unwrap();
} else {
rmp::encode::write_map_len(&mut bytes, 3).unwrap();
}
write_str_msg_pack("id", &mut bytes);
write_str_msg_pack(&self.id, &mut bytes);
if has_header {
rmp::encode::write_str(&mut bytes, "header").unwrap();
bytes.extend_from_slice(&self.header);
}
rmp::encode::write_str(&mut bytes, "id").unwrap();
rmp::encode::write_str(&mut bytes, &self.id).unwrap();
write_str_msg_pack("origin_id", &mut bytes);
write_str_msg_pack(&self.origin_id, &mut bytes);
rmp::encode::write_str(&mut bytes, "origin_id").unwrap();
rmp::encode::write_str(&mut bytes, &self.origin_id).unwrap();
write_str_msg_pack("payload", &mut bytes);
rmp::encode::write_str(&mut bytes, "payload").unwrap();
bytes.extend_from_slice(&self.payload);
bytes
}
/// Parses a collection of bytes, returning an untyped response if it can be potentially
/// represented as a [`Response`] depending on the payload, or the original bytes if it does not
/// represent a [`Response`].
/// represented as a [`Response`] depending on the payload.
///
/// NOTE: This supports parsing an invalid response where the payload would not properly
/// deserialize, but the bytes themselves represent a complete response of some kind.
pub fn from_slice(input: &'a [u8]) -> Result<Self, UntypedResponseParseError> {
if input.len() < 2 {
if input.is_empty() {
return Err(UntypedResponseParseError::WrongType);
}
// MsgPack marks a fixmap using 0x80 - 0x8f to indicate the size (up to 15 elements).
//
// In the case of the request, there are only three elements: id, origin_id, and payload.
// So the first byte should ALWAYS be 0x83 (131).
if input[0] != 0x83 {
return Err(UntypedResponseParseError::WrongType);
}
let has_header = match rmp::Marker::from_u8(input[0]) {
rmp::Marker::FixMap(3) => false,
rmp::Marker::FixMap(4) => true,
_ => return Err(UntypedResponseParseError::WrongType),
};
// Skip the first byte representing the fixmap
// Advance position by marker
let input = &input[1..];
// Validate that first field is id
let (input, id_key) =
parse_msg_pack_str(input).map_err(|_| UntypedResponseParseError::WrongType)?;
if id_key != "id" {
return Err(UntypedResponseParseError::WrongType);
}
// Parse the header if we have one
let (header, input) = if has_header {
let (_, input) =
read_key_eq(input, "header").map_err(|_| UntypedResponseParseError::WrongType)?;
let (header, input) =
read_header_bytes(input).map_err(|_| UntypedResponseParseError::WrongType)?;
(header, input)
} else {
([0u8; 0].as_slice(), input)
};
// Validate that next field is id
let (_, input) =
read_key_eq(input, "id").map_err(|_| UntypedResponseParseError::WrongType)?;
// Get the id itself
let (input, id) =
parse_msg_pack_str(input).map_err(|_| UntypedResponseParseError::InvalidId)?;
let (id, input) =
read_str_bytes(input).map_err(|_| UntypedResponseParseError::InvalidId)?;
// Validate that second field is origin_id
let (input, origin_id_key) =
parse_msg_pack_str(input).map_err(|_| UntypedResponseParseError::WrongType)?;
if origin_id_key != "origin_id" {
return Err(UntypedResponseParseError::WrongType);
}
// Validate that next field is origin_id
let (_, input) =
read_key_eq(input, "origin_id").map_err(|_| UntypedResponseParseError::WrongType)?;
// Get the origin_id itself
let (input, origin_id) =
parse_msg_pack_str(input).map_err(|_| UntypedResponseParseError::InvalidOriginId)?;
let (origin_id, input) =
read_str_bytes(input).map_err(|_| UntypedResponseParseError::InvalidOriginId)?;
// Validate that second field is payload
let (input, payload_key) =
parse_msg_pack_str(input).map_err(|_| UntypedResponseParseError::WrongType)?;
if payload_key != "payload" {
return Err(UntypedResponseParseError::WrongType);
}
// Validate that final field is payload
let (_, input) =
read_key_eq(input, "payload").map_err(|_| UntypedResponseParseError::WrongType)?;
let header = Cow::Borrowed(header);
let id = Cow::Borrowed(id);
let origin_id = Cow::Borrowed(origin_id);
let payload = Cow::Borrowed(input);
Ok(Self {
header,
id,
origin_id,
payload,
@ -252,6 +303,7 @@ mod tests {
#[test]
fn untyped_response_should_support_converting_to_bytes() {
let bytes = Response {
header: header!(),
id: "some id".to_string(),
origin_id: "some origin id".to_string(),
payload: true,
@ -263,9 +315,47 @@ mod tests {
assert_eq!(untyped_response.to_bytes(), bytes);
}
#[test]
fn untyped_response_should_support_converting_to_bytes_with_header() {
let bytes = Response {
header: header!("key" -> 123),
id: "some id".to_string(),
origin_id: "some origin id".to_string(),
payload: true,
}
.to_vec()
.unwrap();
let untyped_response = UntypedResponse::from_slice(&bytes).unwrap();
assert_eq!(untyped_response.to_bytes(), bytes);
}
#[test]
fn untyped_response_should_support_parsing_from_response_bytes_with_header() {
let bytes = Response {
header: header!("key" -> 123),
id: "some id".to_string(),
origin_id: "some origin id".to_string(),
payload: true,
}
.to_vec()
.unwrap();
assert_eq!(
UntypedResponse::from_slice(&bytes),
Ok(UntypedResponse {
header: Cow::Owned(utils::serialize_to_vec(&header!("key" -> 123)).unwrap()),
id: Cow::Borrowed("some id"),
origin_id: Cow::Borrowed("some origin id"),
payload: Cow::Owned(vec![TRUE_BYTE]),
})
);
}
#[test]
fn untyped_response_should_support_parsing_from_response_bytes_with_valid_payload() {
let bytes = Response {
header: header!(),
id: "some id".to_string(),
origin_id: "some origin id".to_string(),
payload: true,
@ -276,6 +366,7 @@ mod tests {
assert_eq!(
UntypedResponse::from_slice(&bytes),
Ok(UntypedResponse {
header: Cow::Owned(vec![]),
id: Cow::Borrowed("some id"),
origin_id: Cow::Borrowed("some origin id"),
payload: Cow::Owned(vec![TRUE_BYTE]),
@ -287,6 +378,7 @@ mod tests {
fn untyped_response_should_support_parsing_from_response_bytes_with_invalid_payload() {
// Response with id < 32 bytes
let mut bytes = Response {
header: header!(),
id: "".to_string(),
origin_id: "".to_string(),
payload: true,
@ -301,6 +393,7 @@ mod tests {
assert_eq!(
UntypedResponse::from_slice(&bytes),
Ok(UntypedResponse {
header: Cow::Owned(vec![]),
id: Cow::Owned("".to_string()),
origin_id: Cow::Owned("".to_string()),
payload: Cow::Owned(vec![TRUE_BYTE, NEVER_USED_BYTE]),

Loading…
Cancel
Save