diff --git a/src/files.rs b/src/files.rs index c757d77..249fe4f 100644 --- a/src/files.rs +++ b/src/files.rs @@ -1,9 +1,9 @@ use log::debug; use tokio::{ - fs::try_exists as dir_exists, - fs::create_dir as create_dir, - fs::File, - io::AsyncWriteExt + fs::try_exists as dir_exists, + fs::create_dir as create_dir, + fs::File, + io::AsyncWriteExt }; use crate::torrent::Torrent; @@ -11,11 +11,11 @@ use crate::torrent::Torrent; /// Represents information about a file being downloaded. #[derive(Debug)] struct FileInfo { - file: File, - length: u64, - current_length: u64, - name: String, - complete: bool + file: File, + length: u64, + current_length: u64, + name: String, + complete: bool } /// Represents a collection of files being downloaded. @@ -23,86 +23,86 @@ struct FileInfo { pub struct Files(Vec); impl Files { - /// Creates a new `Files` instance. - pub fn new() -> Self { - Self(vec![]) - } - - /// Creates the files in the local system for downloading. - /// - /// # Arguments - /// - /// * `torrent` - The `Torrent` instance describing the torrent. - /// * `download_path` - The path where the files will be downloaded. - pub async fn create_files(&mut self, torrent: &Torrent, download_path: &str) { - match &torrent.info.files { - // Single File Mode - None => { - let path = &format!("{download_path}/{}", torrent.info.name); - let file = File::create(&path).await.unwrap(); - - let length = torrent.info.length.unwrap_or(0) as u64; - - self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false }) - } - - // Multi File Mode - Some(files) => { - for t_file in files { - let mut path = download_path.to_string(); - - for dir in &t_file.path[..t_file.path.len() - 1] { - path.push('/'); - path.push_str(dir); - - if !dir_exists(&path).await.unwrap() { - debug!("Creating: {path}"); - create_dir(&path).await.unwrap(); - } - } - - path.push('/'); - path.push_str(&t_file.path[t_file.path.len() - 1]); - - - debug!("Creating: {path}"); - let file = File::create(&path).await.unwrap(); - let length = t_file.length; - - self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false }); - } + /// Creates a new `Files` instance. + pub fn new() -> Self { + Self(vec![]) + } + + /// Creates the files in the local system for downloading. + /// + /// # Arguments + /// + /// * `torrent` - The `Torrent` instance describing the torrent. + /// * `download_path` - The path where the files will be downloaded. + pub async fn create_files(&mut self, torrent: &Torrent, download_path: &str) { + match &torrent.info.files { + // Single File Mode + None => { + let path = &format!("{download_path}/{}", torrent.info.name); + let file = File::create(&path).await.unwrap(); + + let length = torrent.info.length.unwrap_or(0) as u64; + + self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false }) + } + + // Multi File Mode + Some(files) => { + for t_file in files { + let mut path = download_path.to_string(); + + for dir in &t_file.path[..t_file.path.len() - 1] { + path.push('/'); + path.push_str(dir); + + if !dir_exists(&path).await.unwrap() { + debug!("Creating: {path}"); + create_dir(&path).await.unwrap(); } + } + + path.push('/'); + path.push_str(&t_file.path[t_file.path.len() - 1]); + + + debug!("Creating: {path}"); + let file = File::create(&path).await.unwrap(); + let length = t_file.length; + + self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false }); } + } } - - /// Writes a piece of data to the appropriate files. - /// - /// # Arguments - /// - /// * `piece` - The piece of data to write. - pub async fn write_piece(&mut self, piece: Vec) { - let mut j = 0; - - let mut piece_len = piece.len() as u64; - let file_iterator = self.0.iter_mut(); - - for file in file_iterator { - - if file.complete { continue } - - if file.current_length + piece_len > file.length { - let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap(); - debug!("Wrote {n}B > {}", file.name); - j = (file.length - file.current_length) as usize; - file.current_length += j as u64; - piece_len -= j as u64; - file.complete = true; - } else { - let n = file.file.write(&piece[j..]).await.unwrap(); - debug!("Wrote {n}B > {}", file.name); - file.current_length += piece_len; - return - } - } + } + + /// Writes a piece of data to the appropriate files. + /// + /// # Arguments + /// + /// * `piece` - The piece of data to write. + pub async fn write_piece(&mut self, piece: Vec) { + let mut j = 0; + + let mut piece_len = piece.len() as u64; + let file_iterator = self.0.iter_mut(); + + for file in file_iterator { + + if file.complete { continue } + + if file.current_length + piece_len > file.length { + let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap(); + debug!("Wrote {n}B > {}", file.name); + j = (file.length - file.current_length) as usize; + file.current_length += j as u64; + piece_len -= j as u64; + file.complete = true; + } else { + let n = file.file.write(&piece[j..]).await.unwrap(); + debug!("Wrote {n}B > {}", file.name); + file.current_length += piece_len; + return + } } + } } \ No newline at end of file diff --git a/src/handshake.rs b/src/handshake.rs index 0cd252c..ab62671 100644 --- a/src/handshake.rs +++ b/src/handshake.rs @@ -3,103 +3,103 @@ use log::{ info, error }; /// Represents the handshake message that will be sent to a client. #[derive(Debug)] pub struct Handshake { - /// The length of the protocol name, must be 19 for "BitTorrent protocol". - p_str_len: u8, - /// The protocol name, should always be "BitTorrent protocol". - p_str: String, - /// Reserved for extensions, currently unused. - reserved: [u8; 8], - /// The infohash for the torrent. - info_hash: Vec, - /// The identifier for the client. - pub peer_id: String, + /// The length of the protocol name, must be 19 for "BitTorrent protocol". + p_str_len: u8, + /// The protocol name, should always be "BitTorrent protocol". + p_str: String, + /// Reserved for extensions, currently unused. + reserved: [u8; 8], + /// The infohash for the torrent. + info_hash: Vec, + /// The identifier for the client. + pub peer_id: String, } impl Handshake { - /// Creates a new handshake. - /// - /// # Arguments - /// - /// * `info_hash` - The infohash for the torrent. - /// - /// # Returns - /// - /// A new `Handshake` instance on success, or an empty `Result` indicating an error. - pub fn new(info_hash: &[u8]) -> Option { - if info_hash.len() != 20 { - error!("Incorrect infohash length, consider using the helper function in torrent"); - return None; - } - - Some(Self { - p_str_len: 19, - p_str: String::from("BitTorrent protocol"), - reserved: [0; 8], - info_hash: info_hash.to_vec(), - peer_id: String::from("-MY0001-123456654322") - }) + /// Creates a new handshake. + /// + /// # Arguments + /// + /// * `info_hash` - The infohash for the torrent. + /// + /// # Returns + /// + /// A new `Handshake` instance on success, or an empty `Result` indicating an error. + pub fn new(info_hash: &[u8]) -> Option { + if info_hash.len() != 20 { + error!("Incorrect infohash length, consider using the helper function in torrent"); + return None; } - - /// Converts the `Handshake` instance to a byte buffer for sending to a peer. - /// - /// # Returns - /// - /// A byte vector containing the serialized handshake. - pub fn to_buffer(&self) -> Vec { - let mut buf: Vec = vec![0; 68]; - - buf[0] = self.p_str_len; - buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); - buf[21..28].copy_from_slice(&self.reserved[..7]); - buf[28..48].copy_from_slice(&self.info_hash[..20]); - buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); - - buf + + Some(Self { + p_str_len: 19, + p_str: String::from("BitTorrent protocol"), + reserved: [0; 8], + info_hash: info_hash.to_vec(), + peer_id: String::from("-MY0001-123456654322") + }) + } + + /// Converts the `Handshake` instance to a byte buffer for sending to a peer. + /// + /// # Returns + /// + /// A byte vector containing the serialized handshake. + pub fn to_buffer(&self) -> Vec { + let mut buf: Vec = vec![0; 68]; + + buf[0] = self.p_str_len; + buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); + buf[21..28].copy_from_slice(&self.reserved[..7]); + buf[28..48].copy_from_slice(&self.info_hash[..20]); + buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); + + buf + } + + /// Converts a byte buffer to a `Handshake` instance. + /// + /// # Arguments + /// + /// * `buf` - A byte vector containing the serialized handshake. + /// + /// # Returns + /// + /// A new `Handshake` instance on success, or an empty `Result` indicating an error. + /// + /// # Errors + /// + /// Returns an error if the provided buffer is not long enough (at least 68 bytes). + pub fn from_buffer(buf: &Vec) -> Option { + // Verify that buffer is at least the correct size, if not error + if buf.len() < 68 { + error!("buffer provided to handshake was too short"); + return None; } - - /// Converts a byte buffer to a `Handshake` instance. - /// - /// # Arguments - /// - /// * `buf` - A byte vector containing the serialized handshake. - /// - /// # Returns - /// - /// A new `Handshake` instance on success, or an empty `Result` indicating an error. - /// - /// # Errors - /// - /// Returns an error if the provided buffer is not long enough (at least 68 bytes). - pub fn from_buffer(buf: &Vec) -> Option { - // Verify that buffer is at least the correct size, if not error - if buf.len() < 68 { - error!("buffer provided to handshake was too short"); - return None; - } - - let mut p_str = String::new(); - for byte in buf.iter().take(20).skip(1) { - p_str.push(*byte as char) - } - - let mut info_hash: Vec = vec![0; 20]; - info_hash[..20].copy_from_slice(&buf[28..48]); - - let mut peer_id = String::new(); - for byte in buf.iter().take(68).skip(48) { - peer_id.push(*byte as char) - } - - Some(Self { - p_str_len: buf[0], - p_str, - reserved: [0; 8], - info_hash, - peer_id - }) + + let mut p_str = String::new(); + for byte in buf.iter().take(20).skip(1) { + p_str.push(*byte as char) } - - pub fn log_useful_information(&self) { - info!("Connected - PeerId: {:?}", self.peer_id); + + let mut info_hash: Vec = vec![0; 20]; + info_hash[..20].copy_from_slice(&buf[28..48]); + + let mut peer_id = String::new(); + for byte in buf.iter().take(68).skip(48) { + peer_id.push(*byte as char) } + + Some(Self { + p_str_len: buf[0], + p_str, + reserved: [0; 8], + info_hash, + peer_id + }) + } + + pub fn log_useful_information(&self) { + info!("Connected - PeerId: {:?}", self.peer_id); + } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5c4bfa2..5d75294 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,15 +19,15 @@ mod tracker; // Crate Imports use crate::{ - files::Files, - peer::Peer, - torrent::Torrent, - tracker::{ - AnnounceMessage, AnnounceMessageResponse, - ConnectionMessage, - FromBuffer, - Tracker - } + files::Files, + peer::Peer, + torrent::Torrent, + tracker::{ + AnnounceMessage, AnnounceMessageResponse, + ConnectionMessage, + FromBuffer, + Tracker + } }; // External Ipmorts @@ -38,86 +38,86 @@ use log::{ debug, info, LevelFilter }; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { - #[arg(short, long)] - log_file_path: Option, - - #[arg(short, long)] - torrent_file_path: String, - - #[arg(short, long)] - download_path: String, + #[arg(short, long)] + log_file_path: Option, + + #[arg(short, long)] + torrent_file_path: String, + + #[arg(short, long)] + download_path: String, } /// The root function #[tokio::main] async fn main() { - let args = Args::parse(); - - // Creates a log file to handle large amounts of data - let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log")); - simple_logging::log_to_file(&log_path, LevelFilter::Info).unwrap(); - - // Read the Torrent File - let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; - info!("Sucessfully read torrent file"); - torrent.log_useful_information(); - - // Create the files that will be written to - let mut files = Files::new(); - files.create_files(&torrent, &args.download_path).await; - - // Gets peers from the given tracker - let (_remote_hostname, _remote_port) = torrent.get_tracker(); - let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337); - debug!("{}:{}", remote_hostname, remote_port); - - let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await; - info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port); - let connection_message = ConnectionMessage::from_buffer( - &tracker.send_message(&ConnectionMessage::create_basic_connection()).await - ); - - debug!("{:?}", connection_message); - - let announce_message_response = AnnounceMessageResponse::from_buffer( - &tracker.send_message(&AnnounceMessage::new( - connection_message.connection_id, - &torrent.get_info_hash(), - "-MY0001-123456654321", - torrent.get_total_length() as i64 - )).await - ); - - debug!("{:?}", announce_message_response); - info!("Found Peers"); + let args = Args::parse(); + + // Creates a log file to handle large amounts of data + let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log")); + simple_logging::log_to_file(&log_path, LevelFilter::Info).unwrap(); + + // Read the Torrent File + let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; + info!("Sucessfully read torrent file"); + torrent.log_useful_information(); + + // Create the files that will be written to + let mut files = Files::new(); + files.create_files(&torrent, &args.download_path).await; + + // Gets peers from the given tracker + let (_remote_hostname, _remote_port) = torrent.get_tracker(); + let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337); + debug!("{}:{}", remote_hostname, remote_port); + + let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await; + info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port); + let connection_message = ConnectionMessage::from_buffer( + &tracker.send_message(&ConnectionMessage::create_basic_connection()).await + ); + + debug!("{:?}", connection_message); + + let announce_message_response = AnnounceMessageResponse::from_buffer( + &tracker.send_message(&AnnounceMessage::new( + connection_message.connection_id, + &torrent.get_info_hash(), + "-MY0001-123456654321", + torrent.get_total_length() as i64 + )).await + ); + + debug!("{:?}", announce_message_response); + info!("Found Peers"); + + // Creates an assumed peer connection to the `SocketAddr` given + let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await { + None => { return }, + Some(peer) => peer + }; + + let num_pieces = torrent.info.pieces.len() / 20; + peer.handshake(&torrent).await; + peer.keep_alive_until_unchoke().await; + + info!("Successfully Created Connection with peer: {}", peer.peer_id); + + let mut len = 0; + + for index in 0..num_pieces { + let piece= peer.request_piece( + index as u32, torrent.info.piece_length as u32, + &mut len, torrent.get_total_length() as u32 + ).await; - // Creates an assumed peer connection to the `SocketAddr` given - let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await { - None => { return }, - Some(peer) => peer - }; - - let num_pieces = torrent.info.pieces.len() / 20; - peer.handshake(&torrent).await; - peer.keep_alive_until_unchoke().await; - - info!("Successfully Created Connection with peer: {}", peer.peer_id); - - let mut len = 0; - - for index in 0..num_pieces { - let piece= peer.request_piece( - index as u32, torrent.info.piece_length as u32, - &mut len, torrent.get_total_length() as u32 - ).await; - - if torrent.check_piece(&piece, index as u32) { - files.write_piece(piece).await; - } else { - break - } + if torrent.check_piece(&piece, index as u32) { + files.write_piece(piece).await; + } else { + break } - - peer.disconnect().await; - info!("Successfully completed download"); + } + + peer.disconnect().await; + info!("Successfully completed download"); } diff --git a/src/message.rs b/src/message.rs index 53576c1..b1ca935 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,252 +1,258 @@ use std::vec; -use log::error; +use log::{error, debug}; /// Represents a message in the BitTorrent protocol. #[derive(Debug, PartialEq)] pub struct Message { - /// The length of the message, including the type and payload. - pub message_length: u32, - /// The type of message. - pub message_type: MessageType, - /// The payload of the message, if any. - pub payload: Option>, + /// The length of the message, including the type and payload. + pub message_length: u32, + /// The type of message. + pub message_type: MessageType, + /// The payload of the message, if any. + pub payload: Option>, } pub trait ToBuffer { - fn to_buffer(self) -> Vec; + fn to_buffer(self) -> Vec; } pub trait FromBuffer { - fn from_buffer(buf: &[u8]) -> Self; + fn from_buffer(buf: &[u8]) -> Self; } impl Message { - /// Creates a new message. - /// - /// # Arguments - /// - /// * `message_length` - The length of the message. - /// * `message_type` - The type of message. - /// * `payload` - The payload of the message, if any. - pub fn new(message_length: u32, message_type: MessageType, payload: Option>) -> Self { - Self { message_length, message_type, payload } - } + /// Creates a new message. + /// + /// # Arguments + /// + /// * `message_length` - The length of the message. + /// * `message_type` - The type of message. + /// * `payload` - The payload of the message, if any. + pub fn new(message_length: u32, message_type: MessageType, payload: Option>) -> Self { + Self { message_length, message_type, payload } + } } impl FromBuffer for Message { - /// Decodes a message from a given buffer. - /// - /// # Arguments - /// - /// * `buf` - The byte buffer containing the serialized message. - /// - /// # Returns - /// - /// A new `Message` instance on success, or an empty `Result` indicating an error. - fn from_buffer(buf: &[u8]) -> Self { - let mut message_length: [u8; 4] = [0; 4]; - message_length[..4].copy_from_slice(&buf[..4]); - - let message_length = u32::from_be_bytes(message_length); - - let payload: Option>; - let message_type: MessageType; - - if message_length == 0 { - message_type = MessageType::KeepAlive; - payload = None; - } else if message_length == 5 { - message_type = buf[4].into(); - payload = None; - } else { - message_type = buf[4].into(); - - let end_of_message = 4 + message_length as usize; - payload = Some(buf[5..end_of_message].to_vec()); - } - - Self { - message_length, - message_type, - payload - } + /// Decodes a message from a given buffer. + /// + /// # Arguments + /// + /// * `buf` - The byte buffer containing the serialized message. + /// + /// # Returns + /// + /// A new `Message` instance on success, or an empty `Result` indicating an error. + fn from_buffer(buf: &[u8]) -> Self { + let mut message_length: [u8; 4] = [0; 4]; + message_length[..4].copy_from_slice(&buf[..4]); + + let message_length = u32::from_be_bytes(message_length); + + let payload: Option>; + let message_type: MessageType; + + if message_length == 0 { + message_type = MessageType::KeepAlive; + payload = None; + } else if message_length == 5 { + message_type = buf[4].into(); + payload = None; + } else { + message_type = buf[4].into(); + + let end_of_message = 4 + message_length as usize; + + if end_of_message > buf.len() { + error!("index too long"); + debug!("{buf:?}"); + } + + payload = Some(buf[5..end_of_message].to_vec()); } + + Self { + message_length, + message_type, + payload + } + } } impl ToBuffer for Message { - /// Converts the `Message` instance to a byte buffer for sending. - /// - /// # Returns - /// - /// A byte vector containing the serialized message. - fn to_buffer(self) -> Vec { - let mut buf: Vec = vec![]; - - for byte in self.message_length.to_be_bytes() { - buf.push(byte); - } - - match self.message_type { - MessageType::KeepAlive => { - return buf - }, - MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => { - buf.push(self.message_type.into()); - return buf; - }, - MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => { - buf.push(self.message_type.into()); - }, - MessageType::Error => { - panic!("Error making message into buffer") - } - } - - match self.payload { - None => { panic!("Error you are trying to create a message that needs a payload with no payload") } - Some(payload) => { - buf.extend(payload); - } - } - - buf - } -} + /// Converts the `Message` instance to a byte buffer for sending. + /// + /// # Returns + /// + /// A byte vector containing the serialized message. + fn to_buffer(self) -> Vec { + let mut buf: Vec = vec![]; + for byte in self.message_length.to_be_bytes() { + buf.push(byte); + } + + match self.message_type { + MessageType::KeepAlive => { + return buf + }, + MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => { + buf.push(self.message_type.into()); + return buf; + }, + MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => { + buf.push(self.message_type.into()); + }, + MessageType::Error => { + panic!("Error making message into buffer") + } + } + + match self.payload { + None => { panic!("Error you are trying to create a message that needs a payload with no payload") } + Some(payload) => { + buf.extend(payload); + } + } + + buf + } +} + impl Message { - /// Create a request message from a given piece_index, offset, and length - /// - /// # Arguments - /// - /// * `piece_index` - The index of the piece in the torrent - /// * `offset` - The offset within the piece, because requests should be no more than 16KiB - /// * `length` - The length of the piece request, should be 16KiB - /// - /// # Returns - /// - /// A piece request message - pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self { - let mut payload: Vec = vec![]; - - for byte in piece_index.to_be_bytes() { - payload.push(byte); - } - - for byte in offset.to_be_bytes() { - payload.push(byte) - } - - for byte in length.to_be_bytes() { - payload.push(byte) - } - - Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) } + /// Create a request message from a given piece_index, offset, and length + /// + /// # Arguments + /// + /// * `piece_index` - The index of the piece in the torrent + /// * `offset` - The offset within the piece, because requests should be no more than 16KiB + /// * `length` - The length of the piece request, should be 16KiB + /// + /// # Returns + /// + /// A piece request message + pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self { + let mut payload: Vec = vec![]; + + for byte in piece_index.to_be_bytes() { + payload.push(byte); } - - /// Returns the number of messages in the given buffer and their contents. - /// - /// # Arguments - /// - /// * `buf` - The byte buffer containing multiple serialized messages. - /// - /// # Returns - /// - /// A tuple containing a vector of message byte buffers and the number of messages. - pub fn number_of_messages(buf: &[u8]) -> (Vec>, u32) { - let mut message_num = 0; - let mut messages: Vec> = vec![]; - - // Find the length of message one - // put that into an array and increment counter by one - let mut i = 0; // points to the front - let mut j; // points to the back - - loop { - j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4; - - messages.push(buf[i..i+j].to_vec()); - i += j; - message_num += 1; - - if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 { - break; - } - } - - (messages, message_num) + + for byte in offset.to_be_bytes() { + payload.push(byte) } + + for byte in length.to_be_bytes() { + payload.push(byte) + } + + Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) } + } + + /// Returns the number of messages in the given buffer and their contents. + /// + /// # Arguments + /// + /// * `buf` - The byte buffer containing multiple serialized messages. + /// + /// # Returns + /// + /// A tuple containing a vector of message byte buffers and the number of messages. + pub fn number_of_messages(buf: &[u8]) -> (Vec>, u32) { + let mut message_num = 0; + let mut messages: Vec> = vec![]; + + // Find the length of message one + // put that into an array and increment counter by one + let mut i = 0; // points to the front + let mut j; // points to the back + + loop { + j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4; + + messages.push(buf[i..i+j].to_vec()); + i += j; + message_num += 1; + + if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 { + break; + } + } + + (messages, message_num) + } } /// An enum representing all possible message types in the BitTorrent peer wire protocol. #[derive(Clone, Debug, PartialEq)] #[repr(u8)] pub enum MessageType { - /// Keepalive message, 0 length. - /// Potential Errors if trying to handle a keepalive message like another message. - /// Due to length being 0, should always be explicitly handled. - KeepAlive = u8::MAX, - /// Message telling the client not to send any requests until the peer has unchoked, 1 length. - Choke = 0, - /// Message telling the client that it can send requests, 1 length. - Unchoke = 1, - /// Message indicating that the peer is still interested in downloading, 1 length. - Interested = 2, - /// Message indicating that the peer is not interested in downloading, 1 length. - NotInterested = 3, - /// Message indicating that the peer has a given piece, fixed length. - Have = 4, - /// Message sent after a handshake, represents the pieces that the peer has. - Bitfield = 5, - /// Request a given part of a piece based on index, offset, and length, 13 length. - Request = 6, - /// A response to a request with the accompanying data, varying length. - Piece = 7, - /// Cancels a request, 13 length. - Cancel = 8, - /// Placeholder for unimplemented message type. - Port = 9, - Error + /// Keepalive message, 0 length. + /// Potential Errors if trying to handle a keepalive message like another message. + /// Due to length being 0, should always be explicitly handled. + KeepAlive = u8::MAX, + /// Message telling the client not to send any requests until the peer has unchoked, 1 length. + Choke = 0, + /// Message telling the client that it can send requests, 1 length. + Unchoke = 1, + /// Message indicating that the peer is still interested in downloading, 1 length. + Interested = 2, + /// Message indicating that the peer is not interested in downloading, 1 length. + NotInterested = 3, + /// Message indicating that the peer has a given piece, fixed length. + Have = 4, + /// Message sent after a handshake, represents the pieces that the peer has. + Bitfield = 5, + /// Request a given part of a piece based on index, offset, and length, 13 length. + Request = 6, + /// A response to a request with the accompanying data, varying length. + Piece = 7, + /// Cancels a request, 13 length. + Cancel = 8, + /// Placeholder for unimplemented message type. + Port = 9, + Error } impl From for MessageType { - fn from(val: u8) -> MessageType { - match val { - 0 => MessageType::Choke, - 1 => MessageType::Unchoke, - 2 => MessageType::Interested, - 3 => MessageType::NotInterested, - 4 => MessageType::Have, - 5 => MessageType::Bitfield, - 6 => MessageType::Request, - 7 => MessageType::Piece, - 8 => MessageType::Cancel, - 9 => MessageType::Port, - _ => { - error!("Invalid Message Type: {}", val); - MessageType::Error - } - } + fn from(val: u8) -> MessageType { + match val { + 0 => MessageType::Choke, + 1 => MessageType::Unchoke, + 2 => MessageType::Interested, + 3 => MessageType::NotInterested, + 4 => MessageType::Have, + 5 => MessageType::Bitfield, + 6 => MessageType::Request, + 7 => MessageType::Piece, + 8 => MessageType::Cancel, + 9 => MessageType::Port, + _ => { + error!("Invalid Message Type: {}", val); + MessageType::Error + } } + } } impl From for u8 { - fn from(val: MessageType) -> u8 { - match val { - MessageType::Choke => 0, - MessageType::Unchoke => 1, - MessageType::Interested => 2, - MessageType::NotInterested => 3, - MessageType::Have => 4, - MessageType::Bitfield => 5, - MessageType::Request => 6, - MessageType::Piece => 7, - MessageType::Cancel => 8, - MessageType::Port => 9, - _ => { - error!("Invalid Message Type: {:?}", val); - u8::MAX - } - } + fn from(val: MessageType) -> u8 { + match val { + MessageType::Choke => 0, + MessageType::Unchoke => 1, + MessageType::Interested => 2, + MessageType::NotInterested => 3, + MessageType::Have => 4, + MessageType::Bitfield => 5, + MessageType::Request => 6, + MessageType::Piece => 7, + MessageType::Cancel => 8, + MessageType::Port => 9, + _ => { + error!("Invalid Message Type: {:?}", val); + u8::MAX + } } + } } \ No newline at end of file diff --git a/src/peer.rs b/src/peer.rs index d5d3544..b51104b 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -2,214 +2,214 @@ // Crate Imports use crate::{ - handshake::Handshake, - message::{ FromBuffer, Message, MessageType, ToBuffer }, - torrent::Torrent + handshake::Handshake, + message::{ FromBuffer, Message, MessageType, ToBuffer }, + torrent::Torrent }; // External imports use log::{ debug, error }; use std::net::SocketAddr; use tokio::{ - io::{ AsyncReadExt, AsyncWriteExt }, - net::TcpStream + io::{ AsyncReadExt, AsyncWriteExt }, + net::TcpStream }; /// Structure to abstract interaction with a peer. pub struct Peer { - /// The `TcpStream` that is used to communicate with the peeer - connection_stream: TcpStream, - /// The `SocketAddr` of the peer - pub socket_addr: SocketAddr, - /// The id of the peer - pub peer_id: String, - /// Whether the peer is choking the client - pub choking: bool, + /// The `TcpStream` that is used to communicate with the peeer + connection_stream: TcpStream, + /// The `SocketAddr` of the peer + pub socket_addr: SocketAddr, + /// The id of the peer + pub peer_id: String, + /// Whether the peer is choking the client + pub choking: bool, } impl Peer { - /// Creates a connection to the peer. - /// - /// # Arguments - /// - /// * `socket_address` - The socket address of the peer. - pub async fn create_connection(socket_address: &str) -> Option { - let socket_addr = match socket_address.parse::() { - Err(err) => { - error!("error parsing address {}, err: {}", socket_address, err); - return None; - } - Ok(addr) => { addr } - }; - - let connection_stream = match TcpStream::connect(socket_addr).await { - Err(err) => { - error!("unable to connect to {}, err: {}", socket_address, err); - return None - }, - Ok(stream) => { - debug!("created tcpstream successfully to: {socket_addr}"); - stream - } - }; - - Some(Self { - connection_stream, - socket_addr, - peer_id: String::new(), - choking: true, - }) - } - - /// Sends a handshake message to the peer, the first step in the peer wire messaging protocol. - /// - /// # Arguments - /// - /// * `torrent` - The `Torrent` instance associated with the peer. - pub async fn handshake(&mut self, torrent: &Torrent) { - let mut buf = vec![0; 1024]; - - let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); - - self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); - - self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read(&mut buf).await.unwrap(); - - let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); - handshake.log_useful_information(); - - for message_buf in Message::number_of_messages(&buf[68..]).0 { - let message = Message::from_buffer(&message_buf); - - if message.message_type == MessageType::Unchoke { - self.choking = false; - } - } - - self.peer_id = handshake.peer_id; - } - - /// Keeps the connection alive and sends interested messages until the peer unchokes - pub async fn keep_alive_until_unchoke(&mut self) { - loop { - let message = self.read_message().await; - - debug!("{message:?}"); - match message.message_type { - MessageType::Unchoke => { - self.choking = false; - break - } - MessageType::KeepAlive => { - self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await; - self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await; - } - MessageType::Choke => { - self.choking = true; - } - _ => { continue } - } - } - } - - /// Sends a message to the peer and waits for a response, which it returns - pub async fn send_message(&mut self, message: Message) -> Message { - let mut buf = vec![0; 16_397]; - - self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); - - self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); - - Message::from_buffer(&buf) - } - - /// Sends a message to the peer and waits for a response, which it returns - pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message { - let mut buf = vec![0; size]; - - self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); - - self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); - - Message::from_buffer(&buf) - } - - /// Sends a message but doesn't wait for a response - pub async fn send_message_no_response(&mut self, message: Message) { - self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + /// Creates a connection to the peer. + /// + /// # Arguments + /// + /// * `socket_address` - The socket address of the peer. + pub async fn create_connection(socket_address: &str) -> Option { + let socket_addr = match socket_address.parse::() { + Err(err) => { + error!("error parsing address {}, err: {}", socket_address, err); + return None; + } + Ok(addr) => { addr } + }; + + let connection_stream = match TcpStream::connect(socket_addr).await { + Err(err) => { + error!("unable to connect to {}, err: {}", socket_address, err); + return None + }, + Ok(stream) => { + debug!("created tcpstream successfully to: {socket_addr}"); + stream + } + }; + + Some(Self { + connection_stream, + socket_addr, + peer_id: String::new(), + choking: true, + }) + } + + /// Sends a handshake message to the peer, the first step in the peer wire messaging protocol. + /// + /// # Arguments + /// + /// * `torrent` - The `Torrent` instance associated with the peer. + pub async fn handshake(&mut self, torrent: &Torrent) { + let mut buf = vec![0; 1024]; + + let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); + + self.connection_stream.writable().await.unwrap(); + self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); + + self.connection_stream.readable().await.unwrap(); + let _ = self.connection_stream.read(&mut buf).await.unwrap(); + + let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); + handshake.log_useful_information(); + + for message_buf in Message::number_of_messages(&buf[68..]).0 { + let message = Message::from_buffer(&message_buf); + + if message.message_type == MessageType::Unchoke { + self.choking = false; + } } - /// reads a message from the peer - pub async fn read_message(&mut self) -> Message { - let mut buf = vec![0; 16_397]; - - self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read(&mut buf).await.unwrap(); - - Message::from_buffer(&buf) - } - - /// Shutsdown the connection stream - pub async fn disconnect(&mut self) { - match self.connection_stream.shutdown().await { - Err(err) => { - error!("Error disconnecting from {}: {}", self.socket_addr, err); - panic!("Error disconnecting from {}: {}", self.socket_addr, err); - }, - Ok(_) => { - debug!("Successfully disconnected from {}", self.socket_addr) - } + self.peer_id = handshake.peer_id; + } + + /// Keeps the connection alive and sends interested messages until the peer unchokes + pub async fn keep_alive_until_unchoke(&mut self) { + loop { + let message = self.read_message().await; + + debug!("{message:?}"); + match message.message_type { + MessageType::Unchoke => { + self.choking = false; + break } + MessageType::KeepAlive => { + self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await; + self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await; + } + MessageType::Choke => { + self.choking = true; + } + _ => { continue } + } } + } + + /// Sends a message to the peer and waits for a response, which it returns + pub async fn send_message(&mut self, message: Message) -> Message { + let mut buf = vec![0; 16_397]; + + self.connection_stream.writable().await.unwrap(); + self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + + self.connection_stream.readable().await.unwrap(); + let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); + + Message::from_buffer(&buf) + } + + /// Sends a message to the peer and waits for a response, which it returns + pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message { + let mut buf = vec![0; size]; + + self.connection_stream.writable().await.unwrap(); + self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + + self.connection_stream.readable().await.unwrap(); + let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); + + Message::from_buffer(&buf) + } + + /// Sends a message but doesn't wait for a response + pub async fn send_message_no_response(&mut self, message: Message) { + self.connection_stream.writable().await.unwrap(); + self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + } + + /// reads a message from the peer + pub async fn read_message(&mut self) -> Message { + let mut buf = vec![0; 16_397]; + + self.connection_stream.readable().await.unwrap(); + let _ = self.connection_stream.read(&mut buf).await.unwrap(); + + Message::from_buffer(&buf) + } + + /// Shutsdown the connection stream + pub async fn disconnect(&mut self) { + match self.connection_stream.shutdown().await { + Err(err) => { + error!("Error disconnecting from {}: {}", self.socket_addr, err); + panic!("Error disconnecting from {}: {}", self.socket_addr, err); + }, + Ok(_) => { + debug!("Successfully disconnected from {}", self.socket_addr) + } + } + } } impl Peer { - // Sends the requests and reads responses to put a piece together - pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec { - let mut buf = vec![]; - // Sequentially requests piece from the peer - for offset in (0..piece_length).step_by(16_384) { - let mut length = 16_384; - - let response: Message; - - if *len + 16_384 >= total_len { - debug!("Final Request {}", total_len - *len); - length = total_len - *len; - - response = self.send_message_exact_size_response( - Message::create_request(index, offset, length), - length as usize + 13 - ).await; - } else { - response = self.send_message(Message::create_request(index, offset, length)).await; - }; - - match response.message_type { - MessageType::Piece => { - let mut data = response.payload.unwrap(); - *len += data.len() as u32; - *len -= 8; - - for byte in data.drain(..).skip(8) { - buf.push(byte) - } - }, - _ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } - }; - - if *len >= total_len - 1 { - return buf; - } - } - - buf + // Sends the requests and reads responses to put a piece together + pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec { + let mut buf = vec![]; + // Sequentially requests piece from the peer + for offset in (0..piece_length).step_by(16_384) { + let mut length = 16_384; + + let response: Message; + + if *len + 16_384 >= total_len { + debug!("Final Request {}", total_len - *len); + length = total_len - *len; + + response = self.send_message_exact_size_response( + Message::create_request(index, offset, length), + length as usize + 13 + ).await; + } else { + response = self.send_message(Message::create_request(index, offset, length)).await; + }; + + match response.message_type { + MessageType::Piece => { + let mut data = response.payload.unwrap(); + *len += data.len() as u32; + *len -= 8; + + for byte in data.drain(..).skip(8) { + buf.push(byte) + } + }, + _ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } + }; + + if *len >= total_len - 1 { + return buf; + } } + + buf + } } \ No newline at end of file diff --git a/src/torrent.rs b/src/torrent.rs index 85df33d..9ac2c71 100644 --- a/src/torrent.rs +++ b/src/torrent.rs @@ -11,203 +11,203 @@ struct Node(String, i64); /// Represents a file described in a torrent. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct File { - pub path: Vec, - pub length: u64, - #[serde(default)] - md5sum: Option, + pub path: Vec, + pub length: u64, + #[serde(default)] + md5sum: Option, } /// Represents the metadata of a torrent. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Info { - pub name: String, - #[serde(with = "serde_bytes")] - pub pieces: Vec, - #[serde(rename = "piece length")] - pub piece_length: u64, - #[serde(default)] - md5sum: Option, - #[serde(default)] - pub length: Option, - #[serde(default)] - pub files: Option>, - #[serde(default)] - private: Option, - #[serde(default)] - path: Option>, - #[serde(default)] - #[serde(rename = "root hash")] - root_hash: Option, + pub name: String, + #[serde(with = "serde_bytes")] + pub pieces: Vec, + #[serde(rename = "piece length")] + pub piece_length: u64, + #[serde(default)] + md5sum: Option, + #[serde(default)] + pub length: Option, + #[serde(default)] + pub files: Option>, + #[serde(default)] + private: Option, + #[serde(default)] + path: Option>, + #[serde(default)] + #[serde(rename = "root hash")] + root_hash: Option, } /// Represents a torrent. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Torrent { - pub info: Info, - #[serde(default)] - pub announce: Option, - #[serde(default)] - nodes: Option>, - #[serde(default)] - encoding: Option, - #[serde(default)] - httpseeds: Option>, - #[serde(default)] - #[serde(rename = "announce-list")] - announce_list: Option>>, - #[serde(default)] - #[serde(rename = "creation date")] - creation_date: Option, - #[serde(rename = "comment")] - comment: Option, - #[serde(default)] - #[serde(rename = "created by")] - created_by: Option + pub info: Info, + #[serde(default)] + pub announce: Option, + #[serde(default)] + nodes: Option>, + #[serde(default)] + encoding: Option, + #[serde(default)] + httpseeds: Option>, + #[serde(default)] + #[serde(rename = "announce-list")] + announce_list: Option>>, + #[serde(default)] + #[serde(rename = "creation date")] + creation_date: Option, + #[serde(rename = "comment")] + comment: Option, + #[serde(default)] + #[serde(rename = "created by")] + created_by: Option } impl Torrent { - /// Reads a `.torrent` file and converts it into a `Torrent` struct. - /// - /// # Arguments - /// - /// * `path` - The path to the `.torrent` file. - pub async fn from_torrent_file(path: &str) -> Self { - // Read .torrent File - let mut file = TokioFile::open(path).await.unwrap(); - let mut buf: Vec = Vec::new(); - - match file.read_to_end(&mut buf).await { - Err(err) => { - error!("Error reading file till end: {err}"); - panic!("Error reading file till end: {err}"); - } - Ok(_) => { info!("Succesfully read {path}") } - } - - match serde_bencode::from_bytes(&buf) { - Err(err) => { - error!("Error deserializing file: {err}"); - panic!("Error deserializing file: {err}"); - } - Ok(torrent) => { torrent } - } + /// Reads a `.torrent` file and converts it into a `Torrent` struct. + /// + /// # Arguments + /// + /// * `path` - The path to the `.torrent` file. + pub async fn from_torrent_file(path: &str) -> Self { + // Read .torrent File + let mut file = TokioFile::open(path).await.unwrap(); + let mut buf: Vec = Vec::new(); + + match file.read_to_end(&mut buf).await { + Err(err) => { + error!("Error reading file till end: {err}"); + panic!("Error reading file till end: {err}"); + } + Ok(_) => { info!("Succesfully read {path}") } } - - /// Logs info about the *.torrent file - pub fn log_useful_information(&self) { - info!(" -> Torrent Information <- "); - info!("Name: {}", self.info.name); - info!("Tracker: {:?}", self.announce); - info!("Tracker List: {:?}", self.announce_list); - info!("Info Hash: {:X?}", self.get_info_hash()); - info!("Length: {:?}", self.info.length); + + match serde_bencode::from_bytes(&buf) { + Err(err) => { + error!("Error deserializing file: {err}"); + panic!("Error deserializing file: {err}"); + } + Ok(torrent) => { torrent } + } + } + + /// Logs info about the *.torrent file + pub fn log_useful_information(&self) { + info!(" -> Torrent Information <- "); + info!("Name: {}", self.info.name); + info!("Tracker: {:?}", self.announce); + info!("Tracker List: {:?}", self.announce_list); + info!("Info Hash: {:X?}", self.get_info_hash()); + info!("Length: {:?}", self.info.length); + + info!("Files:"); + + match &self.info.files { + None => { + info!("> {}", self.info.name); + } + Some(files) => { + for file in files { + let mut path = String::new(); + for section in &file.path { + path.push_str(&format!("{section}/")); + } + path.pop(); + info!("> {}: {}B", path, file.length) + } + } + } + } + + /// Calculates the info hash of the torrent. + pub fn get_info_hash(&self) -> Vec { + let buf = serde_bencode::to_bytes(&self.info).unwrap(); + + let mut hasher = Sha1::new(); + hasher.update(buf); + let res = hasher.finalize(); + res[..].to_vec() + } + + /// Checks if a downloaded piece matches its hash. + /// + /// # Arguments + /// + /// * `piece` - The downloaded piece. + /// * `index` - The index of the piece. + /// + /// # Returns + /// + /// * `true` if the piece is correct, `false` otherwise. + pub fn check_piece(&self, piece: &[u8], index: u32) -> bool { + let mut hasher = Sha1::new(); + hasher.update(piece); + let result = hasher.finalize(); + + let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; + + if &result[..] == piece_hash { + info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),); + true + } else { + debug!("{:?}", &result[..]); + debug!("{:?}", piece_hash); + debug!("{:?}", &result[..].len()); + debug!("{:?}", piece_hash.len()); + debug!("{}", piece.len()); + error!("Piece downloaded incorrectly"); + false + } + } + + pub fn get_total_length(&self) -> u64 { + match self.info.length { + None => {}, + Some(n) => { return n as u64 } + }; + + match &self.info.files { + None => { 0 }, + Some(files) => { + let mut n = 0; - info!("Files:"); - - match &self.info.files { - None => { - info!("> {}", self.info.name); - } - Some(files) => { - for file in files { - let mut path = String::new(); - for section in &file.path { - path.push_str(&format!("{section}/")); - } - path.pop(); - info!("> {}: {}B", path, file.length) - } - } - } - } - - /// Calculates the info hash of the torrent. - pub fn get_info_hash(&self) -> Vec { - let buf = serde_bencode::to_bytes(&self.info).unwrap(); - - let mut hasher = Sha1::new(); - hasher.update(buf); - let res = hasher.finalize(); - res[..].to_vec() - } - - /// Checks if a downloaded piece matches its hash. - /// - /// # Arguments - /// - /// * `piece` - The downloaded piece. - /// * `index` - The index of the piece. - /// - /// # Returns - /// - /// * `true` if the piece is correct, `false` otherwise. - pub fn check_piece(&self, piece: &[u8], index: u32) -> bool { - let mut hasher = Sha1::new(); - hasher.update(piece); - let result = hasher.finalize(); - - let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; - - if &result[..] == piece_hash { - info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),); - true - } else { - debug!("{:?}", &result[..]); - debug!("{:?}", piece_hash); - debug!("{:?}", &result[..].len()); - debug!("{:?}", piece_hash.len()); - debug!("{}", piece.len()); - error!("Piece downloaded incorrectly"); - false - } - } - - pub fn get_total_length(&self) -> u64 { - match self.info.length { - None => {}, - Some(n) => { return n as u64 } + for file in files { + n += file.length; }; - - match &self.info.files { - None => { 0 }, - Some(files) => { - let mut n = 0; - - for file in files { - n += file.length; - }; - - n - } - } + + n + } } - - pub fn get_tracker(&self) -> (&str, u16) { - let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); - - if let Some(url) = &self.announce { - if let Some(captures) = re.captures(url) { - let hostname = captures.get(1).unwrap().as_str(); - let port = captures.get(2).unwrap().as_str(); + } + + pub fn get_tracker(&self) -> (&str, u16) { + let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); - return (hostname, port.parse().unwrap()); - } else { - println!("URL does not match the expected pattern | {}", url); - } - } - - for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() { - debug!("{:?}", url); - if let Some(captures) = re.captures(&url[i]) { - let hostname = captures.get(1).unwrap().as_str(); - let port = captures.get(2).unwrap().as_str(); - - return (hostname, port.parse().unwrap()); - } else { - println!("URL does not match the expected pattern | {}", url[i]); - } - } - - ("", 0) + if let Some(url) = &self.announce { + if let Some(captures) = re.captures(url) { + let hostname = captures.get(1).unwrap().as_str(); + let port = captures.get(2).unwrap().as_str(); + + return (hostname, port.parse().unwrap()); + } else { + println!("URL does not match the expected pattern | {}", url); + } } + + for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() { + debug!("{:?}", url); + if let Some(captures) = re.captures(&url[i]) { + let hostname = captures.get(1).unwrap().as_str(); + let port = captures.get(2).unwrap().as_str(); + + return (hostname, port.parse().unwrap()); + } else { + println!("URL does not match the expected pattern | {}", url[i]); + } + } + + ("", 0) + } } \ No newline at end of file diff --git a/src/tracker.rs b/src/tracker.rs index 321822d..5630162 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -4,276 +4,289 @@ use tokio::net::UdpSocket; use log::{debug, error}; pub struct Tracker { - /// A UdpSocket used for communication. - connection_stream: UdpSocket, - /// The local socket address requests are made from - pub socket_addr: SocketAddr, - /// The remote socket address of the tracker. - pub remote_addr: SocketAddr + /// A UdpSocket used for communication. + connection_stream: UdpSocket, + /// The local socket address requests are made from + pub socket_addr: SocketAddr, + /// The remote socket address of the tracker. + pub remote_addr: SocketAddr } impl Tracker { - /// Creates a new `Tracker` instance asynchronously. - /// - /// # Arguments - /// - /// * `socket_address` - Local socket address for binding. - /// * `remote_hostname` - Remote host for connection. - /// * `remote_port` - Remote port for connection. - /// - /// # Panics - /// - /// Panics if there is an error parsing the given address or creating the UDP socket. - pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self { - let socket_addr = match socket_address.parse::() { - Err(err) => { - error!("error parsing address {}, err: {}", socket_address, err); - panic!("error parsing given address, {}", err); - } - Ok(addr) => { addr } - }; - - let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0]; - - let remote_addr = SocketAddr::new(remote_address, remote_port); - - let connection_stream = match UdpSocket::bind(socket_addr).await { - Err(err) => { - error!("unable to bind to {}, err: {}", socket_address, err); - panic!("error creating udpsocket, {}", err); - }, - Ok(stream) => { - debug!("bound udpsocket successfully to: {socket_addr}"); - stream - } - }; - - match connection_stream.connect(remote_addr).await { - Err(err) => { - error!("unable to connect to {}, err: {}", remote_addr, err); - panic!("error creating udpsocket, {}", err); - }, - Ok(()) => { - debug!("successfully connected to: {remote_addr}"); - } - }; - - - Self { - connection_stream, - socket_addr, - remote_addr - } - } - - /// Sends a message to the tracker and receives a response asynchronously. - /// - /// # Arguments - /// - /// * `message` - A type that implements the `ToBuffer` trait, representing the message to send. - /// - /// # Returns - /// - /// A byte vector containing the received response. - pub async fn send_message(&mut self, message: &T) -> Vec { - let mut buf: Vec = vec![ 0; 16_384 ]; - - self.connection_stream.send(&message.to_buffer()).await.unwrap(); - self.connection_stream.recv(&mut buf).await.unwrap(); - - buf + /// Creates a new `Tracker` instance asynchronously. + /// + /// # Arguments + /// + /// * `socket_address` - Local socket address for binding. + /// * `remote_hostname` - Remote host for connection. + /// * `remote_port` - Remote port for connection. + /// + /// # Panics + /// + /// Panics if there is an error parsing the given address or creating the UDP socket. + pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self { + let socket_addr = match socket_address.parse::() { + Err(err) => { + error!("error parsing address {}, err: {}", socket_address, err); + panic!("error parsing given address, {}", err); + } + Ok(addr) => { addr } + }; + + let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0]; + + let remote_addr = SocketAddr::new(remote_address, remote_port); + + let connection_stream = match UdpSocket::bind(socket_addr).await { + Err(err) => { + error!("unable to bind to {}, err: {}", socket_address, err); + panic!("error creating udpsocket, {}", err); + }, + Ok(stream) => { + debug!("bound udpsocket successfully to: {socket_addr}"); + stream + } + }; + + match connection_stream.connect(remote_addr).await { + Err(err) => { + error!("unable to connect to {}, err: {}", remote_addr, err); + panic!("error creating udpsocket, {}", err); + }, + Ok(()) => { + debug!("successfully connected to: {remote_addr}"); + } + }; + + + Self { + connection_stream, + socket_addr, + remote_addr } + } + + /// Sends a message to the tracker and receives a response asynchronously. + /// + /// # Arguments + /// + /// * `message` - A type that implements the `ToBuffer` trait, representing the message to send. + /// + /// # Returns + /// + /// A byte vector containing the received response. + pub async fn send_message(&mut self, message: &T) -> Vec { + let mut buf: Vec = vec![ 0; 16_384 ]; + + self.connection_stream.send(&message.to_buffer()).await.unwrap(); + self.connection_stream.recv(&mut buf).await.unwrap(); + + buf + } } /// A trait for converting a type into a byte buffer. pub trait ToBuffer { - /// Converts the implementing type into a byte buffer. - fn to_buffer(&self) -> Vec; + /// Converts the implementing type into a byte buffer. + fn to_buffer(&self) -> Vec; } /// A trait for converting a type from a byte buffer. pub trait FromBuffer { - /// Converts a byte buffer into the implementing type. - fn from_buffer(buf: &[u8]) -> Self; + /// Converts a byte buffer into the implementing type. + fn from_buffer(buf: &[u8]) -> Self; } #[derive(Debug)] /// Represents a basic connection message. pub struct ConnectionMessage { - pub connection_id: i64, - action: i32, - transaction_id: i32, + pub connection_id: i64, + action: i32, + transaction_id: i32, } impl ConnectionMessage { - /// Creates a new basic connection message - pub fn create_basic_connection() -> Self { - Self { - connection_id: 4497486125440, - action: 0, - transaction_id: 123 - } + /// Creates a new basic connection message + pub fn create_basic_connection() -> Self { + Self { + connection_id: 4497486125440, + action: 0, + transaction_id: 123 } + } } impl ToBuffer for ConnectionMessage { - fn to_buffer(&self) -> Vec { - let mut buf: Vec = vec![]; - - buf.extend(self.connection_id.to_be_bytes()); - buf.extend(self.action.to_be_bytes()); - buf.extend(self.transaction_id.to_be_bytes()); - - buf - } + fn to_buffer(&self) -> Vec { + let mut buf: Vec = vec![]; + + buf.extend(self.connection_id.to_be_bytes()); + buf.extend(self.action.to_be_bytes()); + buf.extend(self.transaction_id.to_be_bytes()); + + buf + } } impl FromBuffer for ConnectionMessage { - fn from_buffer(buf: &[u8]) -> Self { - let mut action: [u8; 4] = [0; 4]; - action[..4].copy_from_slice(&buf[..4]); - let action = i32::from_be_bytes(action); - - let mut transaction_id: [u8; 4] = [0; 4]; - transaction_id[..4].copy_from_slice(&buf[4..8]); - let transaction_id = i32::from_be_bytes(transaction_id); - - let mut connection_id: [u8; 8] = [0; 8]; - connection_id[..4].copy_from_slice(&buf[8..16]); - let connection_id = i64::from_be_bytes(connection_id); - - - - Self { - connection_id, - action, - transaction_id - } + fn from_buffer(buf: &[u8]) -> Self { + let mut action: [u8; 4] = [0; 4]; + action[..4].copy_from_slice(&buf[..4]); + let action = i32::from_be_bytes(action); + + let mut transaction_id: [u8; 4] = [0; 4]; + transaction_id[..4].copy_from_slice(&buf[4..8]); + let transaction_id = i32::from_be_bytes(transaction_id); + + let mut connection_id: [u8; 8] = [0; 8]; + connection_id[..8].copy_from_slice(&buf[8..16]); + let connection_id = i64::from_be_bytes(connection_id); + + Self { + connection_id, + action, + transaction_id } + } } #[derive(Debug)] -/// Represents an announcement message in the bittorrent udp tracker protocol. +/// Represents an announcement message in the BitTorrent UDP tracker protocol. pub struct AnnounceMessage { - connection_id: i64, - action: i32, - transaction_id: i32, - info_hash: [u8; 20], - peer_id: [u8; 20], - downloaded: i64, - left: i64, - uploaded: i64, - event: i32, - ip: u32, - key: u32, - num_want: i32, - port: u16, - extensions: u16 + /// The connection ID used for this tracker communication session. + connection_id: i64, + /// The action code representing the type of message (e.g., connect, announce, scrape). + action: i32, + /// A unique identifier for this transaction, allowing matching responses to requests. + transaction_id: i32, + /// The 20-byte SHA-1 hash of the info dictionary in the torrent metainfo. + info_hash: [u8; 20], + /// The unique ID identifying the peer/client sending the announce message. + peer_id: [u8; 20], + /// The total amount of data downloaded by the client in this torrent, in bytes. + downloaded: i64, + /// The amount of data left to download for the client in this torrent, in bytes. + left: i64, + /// The total amount of data uploaded by the client in this torrent, in bytes. + uploaded: i64, + /// An event code indicating the purpose of the announce (e.g., started, completed, stopped). + event: i32, + /// The IP address of the client, expressed as a 32-bit unsigned integer. + ip: u32, + /// A unique key generated by the client for the tracker to identify the peer. + key: u32, + /// The maximum number of peers that the client wants to receive from the tracker. + num_want: i32, + /// The port on which the client is listening for incoming peer connections. + port: u16, + /// Additional extension flags or data included in the announce message. + extensions: u16, } + impl AnnounceMessage { - /// Creates a new announce message. - pub fn new(connection_id: i64, infohash: &[u8], peerid: &str, total_length: i64) -> Self { - let mut info_hash: [u8; 20] = [ 0; 20 ]; - info_hash[..20].copy_from_slice(&infohash[..20]); - - let mut peer_id: [u8; 20] = [0; 20]; - for (i, character) in peerid.chars().enumerate() { - peer_id[i] = character as u8; - } - - Self { - connection_id, - action: 1, - transaction_id: 132, - info_hash, - peer_id, - downloaded: 0, - left: total_length, - uploaded: 0, - event: 1, - ip: 0, - key: 234, - num_want: -1, - port: 61389, - extensions: 0 - } + /// Creates a new announce message. + pub fn new(connection_id: i64, infohash: &[u8], peerid: &str, total_length: i64) -> Self { + let mut info_hash: [u8; 20] = [ 0; 20 ]; + info_hash[..20].copy_from_slice(&infohash[..20]); + + let mut peer_id: [u8; 20] = [0; 20]; + for (i, character) in peerid.chars().enumerate() { + peer_id[i] = character as u8; } + + Self { + connection_id, + action: 1, + transaction_id: 132, + info_hash, + peer_id, + downloaded: 0, + left: total_length, + uploaded: 0, + event: 1, + ip: 0, + key: 234, + num_want: -1, + port: 61389, + extensions: 0 + } + } } impl ToBuffer for AnnounceMessage { - fn to_buffer(&self) -> Vec { - let mut buf: Vec = vec![]; - - buf.extend(self.connection_id.to_be_bytes()); - buf.extend(self.action.to_be_bytes()); - buf.extend(self.transaction_id.to_be_bytes()); - buf.extend(self.info_hash); - buf.extend(self.peer_id); - buf.extend(self.downloaded.to_be_bytes()); - buf.extend(self.left.to_be_bytes()); - buf.extend(self.uploaded.to_be_bytes()); - buf.extend(self.event.to_be_bytes()); - buf.extend(self.ip.to_be_bytes()); - buf.extend(self.key.to_be_bytes()); - buf.extend(self.num_want.to_be_bytes()); - buf.extend(self.port.to_be_bytes()); - buf.extend(self.extensions.to_be_bytes()); - - buf - } + fn to_buffer(&self) -> Vec { + let mut buf: Vec = vec![]; + + buf.extend(self.connection_id.to_be_bytes()); + buf.extend(self.action.to_be_bytes()); + buf.extend(self.transaction_id.to_be_bytes()); + buf.extend(self.info_hash); + buf.extend(self.peer_id); + buf.extend(self.downloaded.to_be_bytes()); + buf.extend(self.left.to_be_bytes()); + buf.extend(self.uploaded.to_be_bytes()); + buf.extend(self.event.to_be_bytes()); + buf.extend(self.ip.to_be_bytes()); + buf.extend(self.key.to_be_bytes()); + buf.extend(self.num_want.to_be_bytes()); + buf.extend(self.port.to_be_bytes()); + buf.extend(self.extensions.to_be_bytes()); + + buf + } } #[derive(Debug)] /// Represents a response to an announcement message. pub struct AnnounceMessageResponse { - pub action: i32, - pub transaction_id: i32, - pub interval: i32, - pub leechers: i32, - pub seeders: i32, - pub ips: Vec, - pub ports: Vec + pub action: i32, + pub transaction_id: i32, + pub interval: i32, + pub leechers: i32, + pub seeders: i32, + pub ips: Vec, + pub ports: Vec } impl FromBuffer for AnnounceMessageResponse { - /// Converts a byte buffer into an `AnnounceMessageResponse` instance. - fn from_buffer(buf: &[u8]) -> Self { - let mut action: [u8; 4] = [0; 4]; - action[..4].copy_from_slice(&buf[0..4]); - let action = i32::from_be_bytes(action); - - let mut transaction_id: [u8; 4] = [ 0; 4 ]; - transaction_id[..4].copy_from_slice(&buf[4..8]); - let transaction_id = i32::from_be_bytes(transaction_id); - - let mut interval: [u8; 4] = [0; 4]; - interval[..4].copy_from_slice(&buf[8..12]); - let interval = i32::from_be_bytes(interval); - - let mut leechers: [u8; 4] = [0; 4]; - leechers[..4].copy_from_slice(&buf[12..16]); - let leechers = i32::from_be_bytes(leechers); - - let mut seeders: [u8; 4] = [0; 4]; - seeders[..4].copy_from_slice(&buf[16..20]); - let seeders = i32::from_be_bytes(seeders); - - let mut ips: Vec = vec![]; - let mut ports: Vec = vec![]; - - for i in (20..buf.len()-6).step_by(6) { - let ip = Ipv4Addr::new(buf[i], buf[i+1], buf[i+2], buf[i+3]); - let port = u16::from_be_bytes([buf[i+4], buf[i+5]]); - - if ip.to_string() == "0.0.0.0" && port == 0 { - break; - } - - ips.push(ip); - ports.push(port) - } - - Self { action, transaction_id, interval, leechers, seeders, ips: ips[1..].to_vec(), ports: ports[1..].to_vec() } + /// Converts a byte buffer into an `AnnounceMessageResponse` instance. + fn from_buffer(buf: &[u8]) -> Self { + let mut action: [u8; 4] = [0; 4]; + action[..4].copy_from_slice(&buf[0..4]); + let action = i32::from_be_bytes(action); + + let mut transaction_id: [u8; 4] = [ 0; 4 ]; + transaction_id[..4].copy_from_slice(&buf[4..8]); + let transaction_id = i32::from_be_bytes(transaction_id); + + let mut interval: [u8; 4] = [0; 4]; + interval[..4].copy_from_slice(&buf[8..12]); + let interval = i32::from_be_bytes(interval); + + let mut leechers: [u8; 4] = [0; 4]; + leechers[..4].copy_from_slice(&buf[12..16]); + let leechers = i32::from_be_bytes(leechers); + + let mut seeders: [u8; 4] = [0; 4]; + seeders[..4].copy_from_slice(&buf[16..20]); + let seeders = i32::from_be_bytes(seeders); + + let mut ips: Vec = vec![]; + let mut ports: Vec = vec![]; + + for i in (20..buf.len()-6).step_by(6) { + let ip = Ipv4Addr::new(buf[i], buf[i+1], buf[i+2], buf[i+3]); + let port = u16::from_be_bytes([buf[i+4], buf[i+5]]); + + if ip.to_string() == "0.0.0.0" && port == 0 { + break; + } + + ips.push(ip); + ports.push(port) } + + Self { action, transaction_id, interval, leechers, seeders, ips: ips[1..].to_vec(), ports: ports[1..].to_vec() } + } } \ No newline at end of file