From 7fab25ea9f4143798d6ff65263a1f68f1fdc8f34 Mon Sep 17 00:00:00 2001 From: ArloFilley Date: Sun, 27 Aug 2023 21:14:19 +0100 Subject: [PATCH] optimisations --- Cargo.toml | 1 - src/files.rs | 2 +- src/handshake.rs | 50 ++++++--------- src/main.rs | 2 +- src/message.rs | 161 ++++++++++++++++++++++++++--------------------- src/peer.rs | 59 ++++++----------- src/torrent.rs | 7 +-- src/tracker.rs | 52 +++++---------- 8 files changed, 146 insertions(+), 188 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 226d70b..97c0e26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,6 @@ description = "A BitTorrent client implemented in Rust that allows you to intera authors = ["Arlo Filley "] exclude = ["testing/", "process/", ".vscode/", ".DS_STORE"] license = "MIT" -license-file = "LICENSE" keywords = ["bittorrent", "torrent", "torrentclient"] readme = "README.md" repository = "https://github.com/arlofilley/rusty_torrent" diff --git a/src/files.rs b/src/files.rs index 44e9d52..c757d77 100644 --- a/src/files.rs +++ b/src/files.rs @@ -53,7 +53,7 @@ impl Files { for dir in &t_file.path[..t_file.path.len() - 1] { path.push('/'); - path.push_str(&dir); + path.push_str(dir); if !dir_exists(&path).await.unwrap() { debug!("Creating: {path}"); diff --git a/src/handshake.rs b/src/handshake.rs index 118bae8..0cd252c 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -1,4 +1,4 @@ -use log::{debug, error}; +use log::{ info, error }; /// Represents the handshake message that will be sent to a client. #[derive(Debug)] @@ -25,17 +25,17 @@ impl Handshake { /// # Returns /// /// A new `Handshake` instance on success, or an empty `Result` indicating an error. - pub fn new(info_hash: Vec) -> Result { + pub fn new(info_hash: &[u8]) -> Option { if info_hash.len() != 20 { error!("Incorrect infohash length, consider using the helper function in torrent"); - return Err(()); + return None; } - Ok(Self { + Some(Self { p_str_len: 19, p_str: String::from("BitTorrent protocol"), reserved: [0; 8], - info_hash, + info_hash: info_hash.to_vec(), peer_id: String::from("-MY0001-123456654322") }) } @@ -49,22 +49,10 @@ impl Handshake { let mut buf: Vec = vec![0; 68]; buf[0] = self.p_str_len; - - for i in 1..20 { - buf[i] = self.p_str.as_bytes()[i - 1]; - } - - for i in 21..28 { - buf[i] = self.reserved[i - 21] - } - - for i in 28..48 { - buf[i] = self.info_hash[i - 28] - } - - for i in 48..68 { - buf[i] = self.peer_id.as_bytes()[i - 48] - } + buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); + buf[21..28].copy_from_slice(&self.reserved[..7]); + buf[28..48].copy_from_slice(&self.info_hash[..20]); + buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); buf } @@ -82,29 +70,27 @@ impl Handshake { /// # Errors /// /// Returns an error if the provided buffer is not long enough (at least 68 bytes). - pub fn from_buffer(buf: &Vec) -> Result { + pub fn from_buffer(buf: &Vec) -> Option { // Verify that buffer is at least the correct size, if not error if buf.len() < 68 { error!("buffer provided to handshake was too short"); - return Err(()); + return None; } let mut p_str = String::new(); - for i in 1..20 { - p_str.push(buf[i] as char) + for byte in buf.iter().take(20).skip(1) { + p_str.push(*byte as char) } let mut info_hash: Vec = vec![0; 20]; - for i in 28..48 { - info_hash[i - 28] = buf[i]; - } + info_hash[..20].copy_from_slice(&buf[28..48]); let mut peer_id = String::new(); - for i in 48..68 { - peer_id.push(buf[i] as char) + for byte in buf.iter().take(68).skip(48) { + peer_id.push(*byte as char) } - Ok(Self { + Some(Self { p_str_len: buf[0], p_str, reserved: [0; 8], @@ -114,6 +100,6 @@ impl Handshake { } pub fn log_useful_information(&self) { - debug!("Connected - PeerId: {:?}", self.peer_id); + info!("Connected - PeerId: {:?}", self.peer_id); } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index bfb9400..5c4bfa2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -95,7 +95,7 @@ async fn main() { let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await { None => { return }, Some(peer) => peer - }; + }; let num_pieces = torrent.info.pieces.len() / 20; peer.handshake(&torrent).await; diff --git a/src/message.rs b/src/message.rs index d82f523..53576c1 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,5 +1,5 @@ -use log::error; use std::vec; +use log::error; /// Represents a message in the BitTorrent protocol. #[derive(Debug, PartialEq)] @@ -12,6 +12,14 @@ pub struct Message { pub payload: Option>, } +pub trait ToBuffer { + fn to_buffer(self) -> Vec; +} + +pub trait FromBuffer { + fn from_buffer(buf: &[u8]) -> Self; +} + impl Message { /// Creates a new message. /// @@ -23,7 +31,9 @@ impl Message { pub fn new(message_length: u32, message_type: MessageType, payload: Option>) -> Self { Self { message_length, message_type, payload } } +} +impl FromBuffer for Message { /// Decodes a message from a given buffer. /// /// # Arguments @@ -33,11 +43,9 @@ impl Message { /// # Returns /// /// A new `Message` instance on success, or an empty `Result` indicating an error. - pub fn from_buffer(buf: &Vec) -> Result { + fn from_buffer(buf: &[u8]) -> Self { let mut message_length: [u8; 4] = [0; 4]; - for i in 0..4 { - message_length[i] = buf[i]; - }; + message_length[..4].copy_from_slice(&buf[..4]); let message_length = u32::from_be_bytes(message_length); @@ -47,44 +55,32 @@ impl Message { if message_length == 0 { message_type = MessageType::KeepAlive; payload = None; + } else if message_length == 5 { + message_type = buf[4].into(); + payload = None; } else { - message_type = match buf[4] { - 0 => MessageType::Choke, - 1 => MessageType::Unchoke, - 2 => MessageType::Interested, - 3 => MessageType::NotInterested, - 4 => MessageType::Have, - 5 => MessageType::Bitfield, - 6 => MessageType::Request, - 7 => MessageType::Piece, - 8 => MessageType::Cancel, - 9 => MessageType::Port, - _ => { - error!("Invalid Message Type: {} | Message: {:?}", buf[4], buf); - return Err(()) - } - }; - - // if message_type == MessageType::Piece && 5 + message_length - 1 != 16397 { - // error!("{:?}", 5..5 + message_length as usize - 1); - // } + message_type = buf[4].into(); - payload = Some(buf[5..5 + message_length as usize - 1].to_vec()); + let end_of_message = 4 + message_length as usize; + payload = Some(buf[5..end_of_message].to_vec()); } - Ok(Self { + Self { message_length, message_type, payload - }) + } } +} + +impl ToBuffer for Message { /// Converts the `Message` instance to a byte buffer for sending. /// /// # Returns /// /// A byte vector containing the serialized message. - pub fn to_buffer(&mut self) -> Vec { + fn to_buffer(self) -> Vec { let mut buf: Vec = vec![]; for byte in self.message_length.to_be_bytes() { @@ -95,54 +91,30 @@ impl Message { MessageType::KeepAlive => { return buf }, - MessageType::Choke => { - buf.push(0); + MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => { + buf.push(self.message_type.into()); return buf; }, - MessageType::Unchoke => { - buf.push(1); - return buf; + MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => { + buf.push(self.message_type.into()); }, - MessageType::Interested => { - buf.push(2); - return buf; - }, - MessageType::NotInterested => { - buf.push(3); - return buf; - }, - MessageType::Have => { - buf.push(4); - }, - MessageType::Bitfield => { - buf.push(5); - }, - MessageType::Request => { - buf.push(6); - }, - MessageType::Piece => { - buf.push(7); - }, - MessageType::Cancel => { - buf.push(8); - }, - MessageType::Port => { - buf.push(9); - }, - } - - match &self.payload { - None => { panic!("Error you are trying to create a message that needs a payload with no payload") } - Some(payload) => { - for byte in payload { - buf.push(*byte) - } - - buf + MessageType::Error => { + panic!("Error making message into buffer") } } - } + match self.payload { + None => { panic!("Error you are trying to create a message that needs a payload with no payload") } + Some(payload) => { + buf.extend(payload); + } + } + + buf + } +} + +impl Message { /// Create a request message from a given piece_index, offset, and length /// /// # Arguments @@ -181,7 +153,7 @@ impl Message { /// # Returns /// /// A tuple containing a vector of message byte buffers and the number of messages. - pub fn number_of_messages(buf: &Vec) -> (Vec>, u32) { + pub fn number_of_messages(buf: &[u8]) -> (Vec>, u32) { let mut message_num = 0; let mut messages: Vec> = vec![]; @@ -194,7 +166,7 @@ impl Message { j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4; messages.push(buf[i..i+j].to_vec()); - i = i+j; + i += j; message_num += 1; if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 { @@ -234,4 +206,47 @@ pub enum MessageType { Cancel = 8, /// Placeholder for unimplemented message type. Port = 9, + Error +} + +impl From for MessageType { + fn from(val: u8) -> MessageType { + match val { + 0 => MessageType::Choke, + 1 => MessageType::Unchoke, + 2 => MessageType::Interested, + 3 => MessageType::NotInterested, + 4 => MessageType::Have, + 5 => MessageType::Bitfield, + 6 => MessageType::Request, + 7 => MessageType::Piece, + 8 => MessageType::Cancel, + 9 => MessageType::Port, + _ => { + error!("Invalid Message Type: {}", val); + MessageType::Error + } + } + } +} + +impl From for u8 { + fn from(val: MessageType) -> u8 { + match val { + MessageType::Choke => 0, + MessageType::Unchoke => 1, + MessageType::Interested => 2, + MessageType::NotInterested => 3, + MessageType::Have => 4, + MessageType::Bitfield => 5, + MessageType::Request => 6, + MessageType::Piece => 7, + MessageType::Cancel => 8, + MessageType::Port => 9, + _ => { + error!("Invalid Message Type: {:?}", val); + u8::MAX + } + } + } } \ No newline at end of file diff --git a/src/peer.rs b/src/peer.rs index dc079a5..d5d3544 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -3,7 +3,7 @@ // Crate Imports use crate::{ handshake::Handshake, - message::{ Message, MessageType }, + message::{ FromBuffer, Message, MessageType, ToBuffer }, torrent::Torrent }; @@ -69,7 +69,7 @@ impl Peer { pub async fn handshake(&mut self, torrent: &Torrent) { let mut buf = vec![0; 1024]; - let handshake_message = Handshake::new(torrent.get_info_hash()).unwrap(); + let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); self.connection_stream.writable().await.unwrap(); self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); @@ -80,21 +80,11 @@ impl Peer { let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); handshake.log_useful_information(); - for message_buf in Message::number_of_messages(&buf[68..].to_vec()).0 { - let message = match Message::from_buffer(&message_buf) { - Err(()) => { - error!("error decoding message"); - self.disconnect().await; - return; - }, - Ok(message) => { message } - }; + for message_buf in Message::number_of_messages(&buf[68..]).0 { + let message = Message::from_buffer(&message_buf); - match message.message_type { - MessageType::Unchoke => { - self.choking = false; - } - _ => {} + if message.message_type == MessageType::Unchoke { + self.choking = false; } } @@ -104,10 +94,7 @@ impl Peer { /// Keeps the connection alive and sends interested messages until the peer unchokes pub async fn keep_alive_until_unchoke(&mut self) { loop { - let message = match self.read_message().await { - None => return, - Some(message) => message - }; + let message = self.read_message().await; debug!("{message:?}"); match message.message_type { @@ -128,7 +115,7 @@ impl Peer { } /// Sends a message to the peer and waits for a response, which it returns - pub async fn send_message(&mut self, mut message: Message) -> Message { + pub async fn send_message(&mut self, message: Message) -> Message { let mut buf = vec![0; 16_397]; self.connection_stream.writable().await.unwrap(); @@ -137,11 +124,11 @@ impl Peer { self.connection_stream.readable().await.unwrap(); let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); - Message::from_buffer(&buf).unwrap() + Message::from_buffer(&buf) } /// Sends a message to the peer and waits for a response, which it returns - pub async fn send_message_exact_size_response(&mut self, mut message: Message, size: usize) -> Message { + pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message { let mut buf = vec![0; size]; self.connection_stream.writable().await.unwrap(); @@ -150,32 +137,23 @@ impl Peer { self.connection_stream.readable().await.unwrap(); let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); - Message::from_buffer(&buf).unwrap() + Message::from_buffer(&buf) } /// Sends a message but doesn't wait for a response - pub async fn send_message_no_response(&mut self, mut message: Message) { + pub async fn send_message_no_response(&mut self, message: Message) { self.connection_stream.writable().await.unwrap(); self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); } /// reads a message from the peer - pub async fn read_message(&mut self) -> Option { + pub async fn read_message(&mut self) -> Message { let mut buf = vec![0; 16_397]; self.connection_stream.readable().await.unwrap(); let _ = self.connection_stream.read(&mut buf).await.unwrap(); - match Message::from_buffer(&buf) { - Err(()) => { - error!("Unable to decode message"); - self.disconnect().await; - return None; - }, - Ok(message) => { - Some(message) - } - } + Message::from_buffer(&buf) } /// Shutsdown the connection stream @@ -197,8 +175,7 @@ impl Peer { pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec { let mut buf = vec![]; // Sequentially requests piece from the peer - for offset in (0..piece_length).step_by(16_384 as usize) { - + for offset in (0..piece_length).step_by(16_384) { let mut length = 16_384; let response: Message; @@ -217,12 +194,12 @@ impl Peer { match response.message_type { MessageType::Piece => { - let data = response.payload.unwrap(); + let mut data = response.payload.unwrap(); *len += data.len() as u32; *len -= 8; - for i in 8..data.len() { - buf.push(data[i]) + for byte in data.drain(..).skip(8) { + buf.push(byte) } }, _ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } diff --git a/src/torrent.rs b/src/torrent.rs index d84785b..85df33d 100644 --- a/src/torrent.rs +++ b/src/torrent.rs @@ -107,7 +107,6 @@ impl Torrent { match &self.info.files { None => { info!("> {}", self.info.name); - return } Some(files) => { for file in files { @@ -142,9 +141,9 @@ impl Torrent { /// # Returns /// /// * `true` if the piece is correct, `false` otherwise. - pub fn check_piece(&self, piece: &Vec, index: u32) -> bool { + pub fn check_piece(&self, piece: &[u8], index: u32) -> bool { let mut hasher = Sha1::new(); - hasher.update(&piece); + hasher.update(piece); let result = hasher.finalize(); let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; @@ -197,7 +196,7 @@ impl Torrent { } } - for (i, url) in self.announce_list.as_ref().unwrap().into_iter().enumerate() { + for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() { debug!("{:?}", url); if let Some(captures) = re.captures(&url[i]) { let hostname = captures.get(1).unwrap().as_str(); diff --git a/src/tracker.rs b/src/tracker.rs index 9dde3df..321822d 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -94,7 +94,7 @@ pub trait ToBuffer { /// A trait for converting a type from a byte buffer. pub trait FromBuffer { /// Converts a byte buffer into the implementing type. - fn from_buffer(buf: &Vec) -> Self; + fn from_buffer(buf: &[u8]) -> Self; } #[derive(Debug)] @@ -129,23 +129,17 @@ impl ToBuffer for ConnectionMessage { } impl FromBuffer for ConnectionMessage { - fn from_buffer(buf: &Vec) -> Self { + fn from_buffer(buf: &[u8]) -> Self { let mut action: [u8; 4] = [0; 4]; - for i in 0..4 { - action[i] = buf[i]; - } + action[..4].copy_from_slice(&buf[..4]); let action = i32::from_be_bytes(action); let mut transaction_id: [u8; 4] = [0; 4]; - for i in 4..8 { - transaction_id[i - 4] = buf[i]; - } + transaction_id[..4].copy_from_slice(&buf[4..8]); let transaction_id = i32::from_be_bytes(transaction_id); let mut connection_id: [u8; 8] = [0; 8]; - for i in 8..16 { - connection_id[i - 8] = buf[i]; - } + connection_id[..4].copy_from_slice(&buf[8..16]); let connection_id = i64::from_be_bytes(connection_id); @@ -179,15 +173,13 @@ pub struct AnnounceMessage { impl AnnounceMessage { /// Creates a new announce message. - pub fn new(connection_id: i64, infohash: &Vec, peerid: &str, total_length: i64) -> Self { - let mut info_hash: [u8; 20] = [0; 20]; - for i in 0..20 { - info_hash[i] = infohash[i]; - } + pub fn new(connection_id: i64, infohash: &[u8], peerid: &str, total_length: i64) -> Self { + let mut info_hash: [u8; 20] = [ 0; 20 ]; + info_hash[..20].copy_from_slice(&infohash[..20]); let mut peer_id: [u8; 20] = [0; 20]; - for i in 0..20 { - peer_id[i] = peerid.chars().nth(i).unwrap() as u8; + for (i, character) in peerid.chars().enumerate() { + peer_id[i] = character as u8; } Self { @@ -246,35 +238,25 @@ pub struct AnnounceMessageResponse { impl FromBuffer for AnnounceMessageResponse { /// Converts a byte buffer into an `AnnounceMessageResponse` instance. - fn from_buffer(buf: &Vec) -> Self { + fn from_buffer(buf: &[u8]) -> Self { let mut action: [u8; 4] = [0; 4]; - for i in 0..4 { - action[i] = buf[i]; - } + action[..4].copy_from_slice(&buf[0..4]); let action = i32::from_be_bytes(action); - let mut transaction_id: [u8; 4] = [0; 4]; - for i in 4..8 { - transaction_id[i - 4] = buf[i]; - } + let mut transaction_id: [u8; 4] = [ 0; 4 ]; + transaction_id[..4].copy_from_slice(&buf[4..8]); let transaction_id = i32::from_be_bytes(transaction_id); let mut interval: [u8; 4] = [0; 4]; - for i in 8..12 { - interval[i - 8] = buf[i]; - } + interval[..4].copy_from_slice(&buf[8..12]); let interval = i32::from_be_bytes(interval); let mut leechers: [u8; 4] = [0; 4]; - for i in 12..16 { - leechers[i - 12] = buf[i]; - } + leechers[..4].copy_from_slice(&buf[12..16]); let leechers = i32::from_be_bytes(leechers); let mut seeders: [u8; 4] = [0; 4]; - for i in 16..20 { - seeders[i - 16] = buf[i]; - } + seeders[..4].copy_from_slice(&buf[16..20]); let seeders = i32::from_be_bytes(seeders); let mut ips: Vec = vec![];