diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000..aaf0dc6 Binary files /dev/null and b/.DS_Store differ diff --git a/.gitignore b/.gitignore index 317483d..4470988 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,2 @@ -/target -Cargo.lock -/log/rusty_torrent.log -/testing -/process -/log -.DS_Store -/downloads \ No newline at end of file +target/ +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 97c0e26..ffb460d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,28 +1,5 @@ -[package] -name = "rusty_torrent" -version = "0.9.3" -edition = "2021" +[workspace] +members = ["lib_rusty_torrent", "rusty_torrenter"] -description = "A BitTorrent client implemented in Rust that allows you to interact with the BitTorrent protocol and download torrents." - -authors = ["Arlo Filley "] -exclude = ["testing/", "process/", ".vscode/", ".DS_STORE"] -license = "MIT" -keywords = ["bittorrent", "torrent", "torrentclient"] -readme = "README.md" -repository = "https://github.com/arlofilley/rusty_torrent" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -clap = { version = "4.4.0", features = ["derive"] } -dns-lookup = "2.0.2" -log = "0.4.20" -regex = "1.9.4" -reqwest = "0.11.20" -serde = { version = "1.0.183", features = ["derive"] } -serde_bencode = "0.2.3" -serde_bytes = "0.11.12" -sha1 = "0.10.5" -simple-logging = "2.0.2" -tokio = { version = "1.30.0", features = ["full"] } +[workspace.dependencies] +tokio = { version = "1.30.0", features = ["full"] } \ No newline at end of file diff --git a/lib_rusty_torrent/Cargo.toml b/lib_rusty_torrent/Cargo.toml new file mode 100644 index 0000000..b1002e4 --- /dev/null +++ b/lib_rusty_torrent/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "lib_rusty_torrent" +version = "0.1.0" +edition = "2021" + +[lib] +name = "lib_rusty_torrent" +crate-type = ["lib"] + +[dependencies] +tokio = { workspace = true } +serde = { version = "1.0.183", features = ["derive"] } +serde_bencode = "0.2.3" +serde_bytes = "0.11.12" +sha1 = "0.10.5" +dns-lookup = "2.0.2" +regex = "1.9.4" +reqwest = "0.11.20" \ No newline at end of file diff --git a/src/files.rs b/lib_rusty_torrent/src/files.rs similarity index 93% rename from src/files.rs rename to lib_rusty_torrent/src/files.rs index 249fe4f..6c5dedd 100644 --- a/src/files.rs +++ b/lib_rusty_torrent/src/files.rs @@ -1,4 +1,3 @@ -use log::debug; use tokio::{ fs::try_exists as dir_exists, fs::create_dir as create_dir, @@ -56,7 +55,6 @@ impl Files { path.push_str(dir); if !dir_exists(&path).await.unwrap() { - debug!("Creating: {path}"); create_dir(&path).await.unwrap(); } } @@ -64,8 +62,6 @@ impl Files { path.push('/'); path.push_str(&t_file.path[t_file.path.len() - 1]); - - debug!("Creating: {path}"); let file = File::create(&path).await.unwrap(); let length = t_file.length; @@ -92,14 +88,12 @@ impl Files { if file.current_length + piece_len > file.length { let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap(); - debug!("Wrote {n}B > {}", file.name); j = (file.length - file.current_length) as usize; file.current_length += j as u64; piece_len -= j as u64; file.complete = true; } else { let n = file.file.write(&piece[j..]).await.unwrap(); - debug!("Wrote {n}B > {}", file.name); file.current_length += piece_len; return } diff --git a/lib_rusty_torrent/src/lib.rs b/lib_rusty_torrent/src/lib.rs new file mode 100644 index 0000000..d900f55 --- /dev/null +++ b/lib_rusty_torrent/src/lib.rs @@ -0,0 +1,5 @@ +pub mod torrent; +pub mod peer_wire_protocol; +pub mod peer; +pub mod files; +pub mod tracker; \ No newline at end of file diff --git a/src/peer.rs b/lib_rusty_torrent/src/peer.rs similarity index 53% rename from src/peer.rs rename to lib_rusty_torrent/src/peer.rs index 2ba10fd..46c192a 100644 --- a/src/peer.rs +++ b/lib_rusty_torrent/src/peer.rs @@ -2,18 +2,15 @@ // Crate Imports use crate::{ - handshake::Handshake, - message::{ FromBuffer, Message, MessageType, ToBuffer }, + peer_wire_protocol::{ Handshake, Message, MessageType }, torrent::Torrent }; // External imports -use log::{ debug, error, info }; -use std::{net::{SocketAddr, SocketAddrV4, Ipv4Addr}, sync::mpsc::Sender}; +use std::net::SocketAddrV4; use tokio::{ - io::{ AsyncReadExt, AsyncWriteExt, Ready }, - net::TcpStream, sync::{oneshot, broadcast}, spawn, - sync::mpsc + io::{ AsyncReadExt, AsyncWriteExt }, + net::TcpStream }; /// Structure to abstract interaction with a peer. @@ -21,11 +18,11 @@ pub struct Peer { /// The `TcpStream` that is used to communicate with the peeer connection_stream: TcpStream, /// The `SocketAddr` of the peer - socket_addr: SocketAddrV4, + pub socket_addr: SocketAddrV4, /// The id of the peer pub peer_id: String, /// Whether the peer is choking the client - choking: bool, + pub choking: bool, } impl Peer { @@ -34,19 +31,17 @@ impl Peer { /// # Arguments /// /// * `socket_address` - The socket address of the peer. - pub async fn create_connection(socket_address: SocketAddrV4) -> Option { + pub async fn create_connection(socket_address: SocketAddrV4) -> Result { let connection_stream = match TcpStream::connect(socket_address).await { Err(err) => { - error!("unable to connect to {}, err: {}", socket_address, err); - return None + return Err(format!("unable to connect to {}, err: {}", socket_address, err)) }, Ok(stream) => { - debug!("created tcpstream successfully to: {socket_address}"); stream } }; - Some(Self { + Ok(Self { connection_stream, socket_addr: socket_address, peer_id: String::new(), @@ -55,63 +50,16 @@ impl Peer { } } -#[derive(Clone, Debug)] -pub enum ControlMessage { - DownloadPiece(u32, u32, u32, u32), - DownloadedPiece(Vec) -} - impl Peer { - pub async fn test(address: SocketAddrV4, torrent: Torrent) -> (broadcast::Sender, broadcast::Receiver) { - let (sender, mut receiver) = broadcast::channel::(16); - - let sx1 = sender.clone(); - let rx1 = receiver.resubscribe(); - let t = torrent.clone(); - - spawn(async move { - let mut peer = match Peer::create_connection(address).await { - None => { return }, - Some(peer) => peer - }; - - peer.handshake(&torrent).await; - peer.keep_alive_until_unchoke().await; - info!("Successfully Created Connection with peer: {}", peer.peer_id); - - loop { - if receiver.is_empty() { - continue - } else { - let Ok(m) = receiver.recv().await else { - continue; - }; - - println!("{m:#?}"); - - match m { - ControlMessage::DownloadPiece(a, b, mut c, d) => { - let buf = peer.request_piece(a, b, &mut c, d).await; - let _ = sender.send(ControlMessage::DownloadedPiece(buf)); - } - _ => () - } - } - } - }); - - (sx1, rx1) - } - /// Sends a handshake message to the peer, the first step in the peer wire messaging protocol. /// /// # Arguments /// /// * `torrent` - The `Torrent` instance associated with the peer. - async fn handshake(&mut self, torrent: &Torrent) { + pub async fn handshake(&mut self, torrent: &Torrent) -> Result<(), String>{ let mut buf = vec![0; 1024]; - let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); + let handshake_message = Handshake::new(&torrent.get_info_hash(), String::from("-RT0001-123456012345")).unwrap(); self.connection_stream.writable().await.unwrap(); self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); @@ -120,10 +68,9 @@ impl Peer { let _ = self.connection_stream.read(&mut buf).await.unwrap(); let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); - handshake.log_useful_information(); for message_buf in Message::number_of_messages(&buf[68..]).0 { - let message = Message::from_buffer(&message_buf); + let message: Message = (&*message_buf).try_into()?; if message.message_type == MessageType::Unchoke { self.choking = false; @@ -131,22 +78,23 @@ impl Peer { } self.peer_id = handshake.peer_id; + + Ok(()) } /// Keeps the connection alive and sends interested messages until the peer unchokes - async fn keep_alive_until_unchoke(&mut self) { + pub async fn keep_alive_until_unchoke(&mut self) -> Result<(), String> { loop { - let message = self.read_message().await; + let message = self.read_message().await?; - debug!("{message:?}"); match message.message_type { MessageType::Unchoke => { self.choking = false; break } MessageType::KeepAlive => { - self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await; - self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await; + self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await?; + self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await?; } MessageType::Choke => { self.choking = true; @@ -154,59 +102,68 @@ impl Peer { _ => { continue } } } + + Ok(()) } /// Sends a message to the peer and waits for a response, which it returns - async fn send_message(&mut self, message: Message) -> Message { - let mut buf = vec![0; 16_397]; + pub async fn send_message(&mut self, message: Message) -> Result { + let mut response = vec![0; 16_397]; + + let message: Vec = message.try_into()?; self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + self.connection_stream.write_all(&message).await.unwrap(); self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); + let _ = self.connection_stream.read_exact(&mut response).await.unwrap(); - Message::from_buffer(&buf) + Ok((*response).try_into()?) } /// Sends a message to the peer and waits for a response, which it returns - async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message { - let mut buf = vec![0; size]; + pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Result { + let mut response = vec![0; size]; + + let message: Vec = message.try_into()?; self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + self.connection_stream.write_all(&message).await.unwrap(); self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); + let _ = self.connection_stream.read_exact(&mut response).await.unwrap(); - Message::from_buffer(&buf) + Ok((*response).try_into()?) } /// Sends a message but doesn't wait for a response - async fn send_message_no_response(&mut self, message: Message) { + pub async fn send_message_no_response(&mut self, message: Message) -> Result<(), String> { + + let message: Vec = message.try_into()?; self.connection_stream.writable().await.unwrap(); - self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); + self.connection_stream.write_all(&message).await.unwrap(); + + Ok(()) } /// reads a message from the peer - async fn read_message(&mut self) -> Message { - let mut buf = vec![0; 16_397]; + pub async fn read_message(&mut self) -> Result { + let mut response = vec![0; 16_397]; self.connection_stream.readable().await.unwrap(); - let _ = self.connection_stream.read(&mut buf).await.unwrap(); + let _ = self.connection_stream.read(&mut response).await.unwrap(); - Message::from_buffer(&buf) + Ok((*response).try_into()?) } /// Shutsdown the connection stream - async fn disconnect(&mut self) { + pub async fn disconnect(&mut self) -> Result<(), String>{ match self.connection_stream.shutdown().await { Err(err) => { - error!("Error disconnecting from {}: {}", self.socket_addr, err); - panic!("Error disconnecting from {}: {}", self.socket_addr, err); + return Err(format!("Error disconnecting from {}: {}", self.socket_addr, err)); }, Ok(_) => { - debug!("Successfully disconnected from {}", self.socket_addr) + Ok(()) } } } @@ -214,7 +171,7 @@ impl Peer { impl Peer { // Sends the requests and reads responses to put a piece together - pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec { + pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Result, String> { let mut buf = vec![]; // Sequentially requests piece from the peer for offset in (0..piece_length).step_by(16_384) { @@ -223,15 +180,14 @@ impl Peer { let response: Message; if *len + 16_384 >= total_len { - debug!("Final Request {}", total_len - *len); length = total_len - *len; response = self.send_message_exact_size_response( - Message::create_request(index, offset, length), + Message::create_piece_request(index, offset, length), length as usize + 13 - ).await; + ).await?; } else { - response = self.send_message(Message::create_request(index, offset, length)).await; + response = self.send_message(Message::create_piece_request(index, offset, length)).await?; }; match response.message_type { @@ -244,14 +200,46 @@ impl Peer { buf.push(byte) } }, - _ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } + _ => { } }; if *len >= total_len - 1 { - return buf; + return Ok(buf); } } - buf + Ok(buf) } -} \ No newline at end of file +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::torrent::Torrent; + use std::net::Ipv4Addr; + + #[tokio::test] + async fn peer_create_connection() { + // Replace the IP and port with the actual values + let socket_address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 6881); + + match Peer::create_connection(socket_address).await { + Ok(peer) => { + assert_eq!(peer.socket_addr, socket_address); + // Add more assertions if needed + } + Err(err) => panic!("Unexpected error: {}", err), + } + } + + #[tokio::test] + async fn peer_handshake() { + let socket_address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 6881); + let mut peer = Peer::create_connection(socket_address.clone()).await.unwrap(); + let torrent = Torrent::from_torrent_file("test.torrent").await.unwrap(); + + assert!(peer.handshake(&torrent).await.is_ok()); + } + + // Add more tests for other methods in the Peer structure +} diff --git a/lib_rusty_torrent/src/peer_wire_protocol.rs b/lib_rusty_torrent/src/peer_wire_protocol.rs new file mode 100644 index 0000000..d26357f --- /dev/null +++ b/lib_rusty_torrent/src/peer_wire_protocol.rs @@ -0,0 +1,554 @@ +/// Represents the handshake message that will be sent to a client. +#[derive(Debug)] +pub struct Handshake { + /// The length of the protocol name, must be 19 for "BitTorrent protocol". + p_str_len: u8, + /// The protocol name, should always be "BitTorrent protocol". + p_str: String, + /// Reserved for extensions, currently unused. + reserved: [u8; 8], + /// The infohash for the torrent. + info_hash: Vec, + /// The identifier for the client. + pub peer_id: String, +} + +impl Handshake { + /// Creates a new handshake. + /// + /// # Arguments + /// + /// * `info_hash` - The infohash for the torrent. + /// + /// # Returns + /// + /// A new `Handshake` instance on success, or an empty `Result` indicating an error. + pub fn new(info_hash: &[u8], peer_id: String) -> Result { + if info_hash.len() != 20 { + return Err(String::from("Incorrect infohash length")); + } + + if peer_id.len() != 20 { + return Err(String::from("Incorrect Peer_Id Length")) + } + + Ok(Self { + p_str_len: 19, + p_str: String::from("BitTorrent protocol"), + reserved: [0; 8], + info_hash: info_hash.to_vec(), + peer_id: String::from("-MY0001-123456654321") + }) + } + + /// Converts the `Handshake` instance to a byte buffer for sending to a peer. + /// + /// # Returns + /// + /// A byte vector containing the serialized handshake. + pub fn to_buffer(&self) -> Vec { + let mut buf: Vec = vec![0; 68]; + + buf[0] = self.p_str_len; + buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); + buf[21..28].copy_from_slice(&self.reserved[..7]); + buf[28..48].copy_from_slice(&self.info_hash[..20]); + buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); + + buf + } + + /// Converts a byte buffer to a `Handshake` instance. + /// + /// # Arguments + /// + /// * `buf` - A byte vector containing the serialized handshake. + /// + /// # Returns + /// + /// A new `Handshake` instance on success, or an empty `Result` indicating an error. + /// + /// # Errors + /// + /// Returns an error if the provided buffer is not long enough (at least 68 bytes). + pub fn from_buffer(buf: &Vec) -> Result { + // Verify that buffer is at least the correct size, if not error + if buf.len() < 68 { + return Err(String::from("buffer provided to handshake was too short")); + } + + let mut p_str = String::new(); + for byte in buf.iter().take(20).skip(1) { + p_str.push(*byte as char) + } + + let mut info_hash: Vec = vec![0; 20]; + info_hash[..20].copy_from_slice(&buf[28..48]); + + let mut peer_id = String::new(); + for byte in buf.iter().take(68).skip(48) { + peer_id.push(*byte as char) + } + + Ok(Self { + p_str_len: buf[0], + p_str, + reserved: [0; 8], + info_hash, + peer_id + }) + } +} + +/// Represents a message in the BitTorrent protocol. +#[derive(Clone, Debug, PartialEq)] +pub struct Message { + /// The length of the message, including the type and payload. + pub message_length: u32, + /// The type of message. + pub message_type: MessageType, + /// The payload of the message, if any. + pub payload: Option>, +} + +impl Message { + /// Creates a new message. + /// + /// # Arguments + /// + /// * `message_length` - The length of the message. + /// * `message_type` - The type of message. + /// * `payload` - The payload of the message, if any. + pub fn new(message_length: u32, message_type: MessageType, payload: Option>) -> Self { + Self { message_length, message_type, payload } + } +} + +impl TryFrom<&[u8]> for Message { + type Error = String; + /// Decodes a message from a given buffer. + /// + /// # Arguments + /// + /// * `buf` - The byte buffer containing the serialized message. + /// + /// # Returns + /// + /// A new `Message` instance on success, or an empty `Result` indicating an error. + fn try_from(value: &[u8]) -> Result { + let mut message_length: [u8; 4] = [0; 4]; + + if value.len() < 5 { + return Err(format!("Buffer not long enough to be a message: Length {}, should be at least 4 bytes", value.len())); + } + + message_length[..4].copy_from_slice(&value[..4]); + + let message_length = u32::from_be_bytes(message_length); + + let payload: Option>; + let message_type: MessageType; + + if message_length == 0 { + message_type = MessageType::KeepAlive; + payload = None; + } else if message_length == 5 { + message_type = value[4].try_into()?; + payload = None; + } else { + message_type = value[4].try_into()?; + + let end_of_message = 4 + message_length as usize; + + if end_of_message > value.len() { + return Err(format!("Invalid message length {} expected {}", value.len(), end_of_message)) + } else { + payload = Some(value[5..end_of_message].to_vec()); + } + } + + Ok(Self { + message_length, + message_type, + payload + }) + } +} + + +impl TryFrom for Vec { + type Error = String; + /// Converts the `Message` instance to a byte buffer for sending. + /// + /// # Returns + /// + /// A byte vector containing the serialized message. + fn try_from(value: Message) -> Result { + let mut buf: Vec = vec![]; + + for byte in value.message_length.to_be_bytes() { + buf.push(byte); + } + + match value.message_type { + MessageType::KeepAlive => { + return Ok(buf) + }, + MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => { + buf.push(value.message_type.try_into()?); + return Ok(buf); + }, + MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => { + buf.push(value.message_type.try_into()?); + }, + } + + match value.payload { + None => { + return Err(String::from("Error you are trying to create a message that needs a payload with no payload")) + } + Some(payload) => { + buf.extend(payload); + } + } + + Ok(buf) + } +} + +impl Message { + /// Create a request message from a given piece_index, offset, and length + /// + /// # Arguments + /// + /// * `piece_index` - The index of the piece in the torrent + /// * `offset` - The offset within the piece, because requests should be no more than 16KiB + /// * `length` - The length of the piece request, should be 16KiB + /// + /// # Returns + /// + /// A piece request message + pub fn create_piece_request(piece_index: u32, offset: u32, length: u32) -> Self { + let mut payload: Vec = vec![]; + + for byte in piece_index.to_be_bytes() { + payload.push(byte); + } + + for byte in offset.to_be_bytes() { + payload.push(byte) + } + + for byte in length.to_be_bytes() { + payload.push(byte) + } + + Self { + message_length: 13, + message_type: MessageType::Request, + payload: Some(payload) + } + } + + /// Returns the number of messages in the given buffer and their contents. + /// + /// # Arguments + /// + /// * `buf` - The byte buffer containing multiple serialized messages. + /// + /// # Returns + /// + /// A tuple containing a vector of message byte buffers and the number of messages. + pub fn number_of_messages(buf: &[u8]) -> (Vec>, u32) { + let mut message_num = 0; + let mut messages: Vec> = vec![]; + + // Find the length of message one + // put that into an array and increment counter by one + let mut i = 0; // points to the front + let mut j; // points to the back + + loop { + j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4; + + messages.push(buf[i..i+j].to_vec()); + i += j; + message_num += 1; + + if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 { + break; + } + } + + (messages, message_num) + } +} + +/// An enum representing all possible message types in the BitTorrent peer wire protocol. +#[derive(Clone, Debug, PartialEq)] +#[repr(u8)] +pub enum MessageType { + /// Keepalive message, 0 length. + /// Potential Errors if trying to handle a keepalive message like another message. + /// Due to length being 0, should always be explicitly handled. + KeepAlive = u8::MAX, + /// Message telling the client not to send any requests until the peer has unchoked, 1 length. + Choke = 0, + /// Message telling the client that it can send requests, 1 length. + Unchoke = 1, + /// Message indicating that the peer is still interested in downloading, 1 length. + Interested = 2, + /// Message indicating that the peer is not interested in downloading, 1 length. + NotInterested = 3, + /// Message indicating that the peer has a given piece, fixed length. + Have = 4, + /// Message sent after a handshake, represents the pieces that the peer has. + Bitfield = 5, + /// Request a given part of a piece based on index, offset, and length, 13 length. + Request = 6, + /// A response to a request with the accompanying data, varying length. + Piece = 7, + /// Cancels a request, 13 length. + Cancel = 8, + /// Placeholder for unimplemented message type. + Port = 9, +} + +impl TryFrom for u8 { + type Error = String; + fn try_from(value: MessageType) -> Result { + match value { + MessageType::Choke => Ok(0), + MessageType::Unchoke => Ok(1), + MessageType::Interested => Ok(2), + MessageType::NotInterested => Ok(3), + MessageType::Have => Ok(4), + MessageType::Bitfield => Ok(5), + MessageType::Request => Ok(6), + MessageType::Piece => Ok(7), + MessageType::Cancel => Ok(8), + MessageType::Port => Ok(9), + _ => { + Err(format!("Invalid Message Type {:?}", value)) + } + } + } +} + +impl TryFrom for MessageType { + type Error = String; + fn try_from(value: u8) -> Result { + match value { + 0 => Ok(MessageType::Choke), + 1 => Ok(MessageType::Unchoke), + 2 => Ok(MessageType::Interested), + 3 => Ok(MessageType::NotInterested), + 4 => Ok(MessageType::Have), + 5 => Ok(MessageType::Bitfield), + 6 => Ok(MessageType::Request), + 7 => Ok(MessageType::Piece), + 8 => Ok(MessageType::Cancel), + 9 => Ok(MessageType::Port), + _ => { + Err(format!("Invalid Message Type {}", value)) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn handshake_creation() { + let info_hash: [u8; 20] = [1; 20]; + let peer_id = String::from("-MY0001-123456654321"); + + match Handshake::new(&info_hash, peer_id.clone()) { + Ok(handshake) => { + assert_eq!(handshake.p_str_len, 19); + assert_eq!(handshake.p_str, "BitTorrent protocol"); + assert_eq!(handshake.reserved, [0; 8]); + assert_eq!(handshake.info_hash, info_hash.to_vec()); + assert_eq!(handshake.peer_id, peer_id); + } + Err(_) => panic!("Unexpected error creating handshake"), + } + } + + #[test] + fn handshake_creation_invalid_infohash() { + let invalid_info_hash: [u8; 19] = [1; 19]; + let peer_id = String::from("-MY0001-123456654321"); + + match Handshake::new(&invalid_info_hash, peer_id.clone()) { + Err(err) => assert_eq!(err, "Incorrect infohash length"), + Ok(_) => panic!("Expected an error creating handshake, but got Ok"), + } + } + + #[test] + fn handshake_creation_invalid_peer_id() { + let info_hash: [u8; 20] = [1; 20]; + let invalid_peer_id = String::from("-INVALID"); + + match Handshake::new(&info_hash, invalid_peer_id) { + Err(err) => assert_eq!(err, "Incorrect Peer_Id Length"), + Ok(_) => panic!("Expected an error creating handshake, but got Ok"), + } + } + + #[test] + fn handshake_to_buffer() { + let info_hash: [u8; 20] = [1; 20]; + let peer_id = String::from("-MY0001-123456654321"); + let handshake = Handshake::new(&info_hash, peer_id).unwrap(); + let buffer = handshake.to_buffer(); + + assert_eq!(buffer.len(), 68); + // Add more assertions based on the expected structure of the buffer if needed + } + + #[test] + fn handshake_from_buffer() { + let info_hash: [u8; 20] = [1; 20]; + let peer_id = String::from("-MY0001-123456654321"); + let original_handshake = Handshake::new(&info_hash, peer_id.clone()).unwrap(); + let buffer = original_handshake.to_buffer(); + + match Handshake::from_buffer(&buffer) { + Ok(handshake) => assert_eq!(handshake.peer_id, peer_id), + Err(err) => panic!("Unexpected error: {}", err), + } + } + + #[test] + fn handshake_from_buffer_invalid_size() { + let short_buffer: Vec = vec![0; 67]; // Invalid size + match Handshake::from_buffer(&short_buffer) { + Err(err) => assert_eq!(err, "buffer provided to handshake was too short"), + Ok(_) => panic!("Expected an error, but got Ok"), + } + } + + #[test] + fn u8_to_message_type() { + assert_eq!(TryInto::::try_into(0 as u8), Ok(MessageType::Choke)); + assert_eq!(TryInto::::try_into(1 as u8), Ok(MessageType::Unchoke)); + assert_eq!(TryInto::::try_into(2 as u8), Ok(MessageType::Interested)); + assert_eq!(TryInto::::try_into(3 as u8), Ok(MessageType::NotInterested)); + assert_eq!(TryInto::::try_into(4 as u8), Ok(MessageType::Have)); + assert_eq!(TryInto::::try_into(5 as u8), Ok(MessageType::Bitfield)); + assert_eq!(TryInto::::try_into(6 as u8), Ok(MessageType::Request)); + assert_eq!(TryInto::::try_into(7 as u8), Ok(MessageType::Piece)); + assert_eq!(TryInto::::try_into(8 as u8), Ok(MessageType::Cancel)); + assert_eq!(TryInto::::try_into(9 as u8), Ok(MessageType::Port)); + assert_eq!(TryInto::::try_into(10 as u8), Err(String::from("Invalid Message Type 10"))); + } + + #[test] + fn message_type_to_u8() { + assert_eq!(TryInto::::try_into(MessageType::Choke), Ok(0 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Unchoke), Ok(1 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Interested), Ok(2 as u8)); + assert_eq!(TryInto::::try_into(MessageType::NotInterested), Ok(3 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Have), Ok(4 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Bitfield), Ok(5 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Request), Ok(6 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Piece), Ok(7 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Cancel), Ok(8 as u8)); + assert_eq!(TryInto::::try_into(MessageType::Port), Ok(9 as u8)); + assert_eq!(TryInto::::try_into(MessageType::KeepAlive), Err(String::from("Invalid Message Type KeepAlive"))); + } + + #[test] + fn create_piece_request() { + let piece_index = 42; + let offset = 1024; + let length = 16384; + + let request_message = Message::create_piece_request(piece_index, offset, length); + + assert_eq!(request_message.message_length, 13); + assert_eq!(request_message.message_type, MessageType::Request); + + if let Some(payload) = request_message.payload { + assert_eq!(payload.len(), 12); // 4 bytes for piece_index + 4 bytes for offset + 4 bytes for length + + let mut expected_payload = vec![]; + expected_payload.extend_from_slice(&piece_index.to_be_bytes()); + expected_payload.extend_from_slice(&offset.to_be_bytes()); + expected_payload.extend_from_slice(&length.to_be_bytes()); + + assert_eq!(payload, expected_payload); + } else { + panic!("Expected payload, but found None"); + } + } + + #[test] + fn try_from_valid_message() { + let message_bytes = vec![0, 0, 0, 5, 1]; // Unchoke message + + match Message::try_from(&message_bytes[..]) { + Ok(message) => { + assert_eq!(message.message_length, 5); + assert_eq!(message.message_type, MessageType::Unchoke); + assert!(message.payload.is_none()); + } + Err(err) => panic!("Unexpected error: {}", err), + } + } + + #[test] + fn try_from_invalid_message() { + let invalid_message_bytes = vec![0, 0, 0, 2]; // Message length indicates 2 bytes, but no payload provided + + match Message::try_from(&invalid_message_bytes[..]) { + Ok(_) => panic!("Expected an error but got Ok"), + Err(err) => { + assert_eq!( + err, + "Buffer not long enough to be a message: Length 4, should be at least 4 bytes" + ); + } + } + } + + #[test] + fn try_into_valid_message() { + let message = Message { + message_length: 5, + message_type: MessageType::Unchoke, + payload: None, + }; + + match Vec::::try_from(message) { + Ok(serialized_message) => { + assert_eq!(serialized_message, vec![0, 0, 0, 5, 1]); // Unchoke message + } + Err(err) => panic!("Unexpected error: {}", err), + } + } + + #[test] + fn try_into_message_with_payload() { + let payload_data = vec![65, 66, 67]; // Arbitrary payload + let message = Message { + message_length: 7, + message_type: MessageType::Piece, + payload: Some(payload_data.clone()), + }; + + match Vec::::try_from(message) { + Ok(serialized_message) => { + let mut expected_serialized_message = vec![0, 0, 0, 7, 7]; // Piece message + expected_serialized_message.extend_from_slice(&payload_data); + + assert_eq!(serialized_message, expected_serialized_message); + } + Err(err) => panic!("Unexpected error: {}", err), + } + } +} \ No newline at end of file diff --git a/lib_rusty_torrent/src/torrent.rs b/lib_rusty_torrent/src/torrent.rs new file mode 100644 index 0000000..ff77d31 --- /dev/null +++ b/lib_rusty_torrent/src/torrent.rs @@ -0,0 +1,394 @@ +use regex::Regex; +use serde::{Deserialize, Serialize}; +use sha1::{Digest, Sha1}; +use tokio::{fs::File as TokioFile, io::AsyncReadExt}; +use std::net::{IpAddr, SocketAddrV4}; + +/// Represents a node in a DHT network. +#[derive(Clone, Debug, Deserialize, Serialize)] +struct Node(String, i64); + +/// Represents a file described in a torrent. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct File { + pub path: Vec, + pub length: u64, + #[serde(default)] + md5sum: Option, +} + +/// Represents the metadata of a torrent. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Info { + pub name: String, + #[serde(with = "serde_bytes")] + pub pieces: Vec, + #[serde(rename = "piece length")] + pub piece_length: u64, + #[serde(default)] + md5sum: Option, + #[serde(default)] + pub length: Option, + #[serde(default)] + pub files: Option>, + #[serde(default)] + private: Option, + #[serde(default)] + path: Option>, + #[serde(default)] + #[serde(rename = "root hash")] + root_hash: Option, +} + +/// Represents a torrent. +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Torrent { + pub info: Info, + #[serde(default)] + pub announce: Option, + #[serde(default)] + nodes: Option>, + #[serde(default)] + encoding: Option, + #[serde(default)] + httpseeds: Option>, + #[serde(default)] + #[serde(rename = "announce-list")] + announce_list: Option>>, + #[serde(default)] + #[serde(rename = "creation date")] + creation_date: Option, + #[serde(rename = "comment")] + comment: Option, + #[serde(default)] + #[serde(rename = "created by")] + created_by: Option +} + +impl Torrent { + /// Reads a `.torrent` file and converts it into a `Torrent` struct. + /// + /// # Arguments + /// + /// * `path` - The path to the `.torrent` file. + pub async fn from_torrent_file(path: &str) -> Result { + let Ok(mut file) = TokioFile::open(path).await else { + return Err(format!("Unable to read file at {path}")); + }; + + let mut buf: Vec = Vec::new(); + let Ok(_) = file.read_to_end(&mut buf).await else { + return Err(format!("Error reading file > {path}")); + }; + + let torrent: Torrent = match serde_bencode::from_bytes(&buf) { + Err(_) => return Err(format!("Error deserializing file > {path}")), + Ok(torrent) => torrent, + }; + + Ok(torrent) + } +} + +impl Torrent { + /// Calculates the info hash of the torrent. + pub fn get_info_hash(&self) -> Vec { + let buf = serde_bencode::to_bytes(&self.info).unwrap(); + + let mut hasher = Sha1::new(); + hasher.update(buf); + let res = hasher.finalize(); + res[..].to_vec() + } + + /// Checks if a downloaded piece matches its hash. + /// + /// # Arguments + /// + /// * `piece` - The downloaded piece. + /// * `index` - The index of the piece. + /// + /// # Returns + /// + /// * `true` if the piece is correct, `false` otherwise. + pub fn check_piece(&self, piece: &[u8], index: u32) -> bool { + let mut hasher = Sha1::new(); + hasher.update(piece); + let result = hasher.finalize(); + + let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; + + if &result[..] == piece_hash { + true + } else { + false + } + } + + pub fn get_total_length(&self) -> u64 { + if let Some(n) = self.info.length { + return n as u64 + }; + + if let Some(files) = &self.info.files { + let mut n = 0; + + for file in files { + n += file.length; + }; + + return n + }; + + 0 + } + + pub fn get_trackers(&self) -> Result, String> { + let mut addresses = vec![]; + + // This is the current regex as I haven't implemented support for http trackers yet + let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); + + if let Some(url) = &self.announce { + if let Some(captures) = re.captures(url) { + let hostname = captures.get(1).unwrap().as_str(); + let port = captures.get(2).unwrap().as_str(); + + if let Ok(ip) = dns_lookup::lookup_host(hostname) { + for i in ip { + if let IpAddr::V4(j) = i { + addresses.push(SocketAddrV4::new(j, port.parse().unwrap())) + } + } + } + } + } + + if let Some(urls) = &self.announce_list { + for url in urls.iter() { + if let Some(captures) = re.captures(&url[0]) { + let hostname = captures.get(1).unwrap().as_str(); + let port = captures.get(2).unwrap().as_str(); + + if let Ok(ip) = dns_lookup::lookup_host(hostname) { + for i in ip { + if let IpAddr::V4(j) = i { + addresses.push(SocketAddrV4::new(j, port.parse().unwrap())); + } + } + } + } + } + } + + if addresses.len() > 0 { + Ok(addresses) + } else { + Err(String::from("Unable to find trackers")) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::runtime::Runtime; + + #[test] + fn from_torrent_file_success() { + let runtime = Runtime::new().unwrap(); + let path = "test.torrent"; + + let result = runtime.block_on(Torrent::from_torrent_file(path)); + println!("{result:?}"); + + assert!(result.is_ok()); + } + + #[test] + fn from_torrent_file_failure() { + let runtime = Runtime::new().unwrap(); + let path = "nonexistent/file.torrent"; + + let result = runtime.block_on(Torrent::from_torrent_file(path)); + + assert!(result.is_err()); + } + + #[test] + fn get_info_hash() { + // Create a mock Torrent instance + let torrent = Torrent { + info: Info { + name: String::from("test_torrent"), + pieces: vec![], + piece_length: 1024, + length: Some(2048), + files: None, + md5sum: None, + private: None, + path: None, + root_hash: None, + }, + announce: Some(String::from("http://tracker.example.com/announce")), + nodes: None, + encoding: None, + httpseeds: None, + announce_list: None, + creation_date: None, + comment: None, + created_by: None, + }; + + let result = torrent.get_info_hash(); + + assert!(!result.is_empty()); + } + + #[test] + fn check_piece_valid() { + let mut hasher = Sha1::new(); + hasher.update(vec![0; 1024]); + let piece_hash: &[u8] = &hasher.finalize(); + + // Create a mock Torrent instance + let torrent = Torrent { + info: Info { + name: String::from("test_torrent"), + pieces: piece_hash.into(), // Mock piece hashes + piece_length: 1024, + length: Some(2048), + files: None, + md5sum: None, + private: None, + path: None, + root_hash: None, + }, + announce: Some(String::from("http://tracker.example.com/announce")), + nodes: None, + encoding: None, + httpseeds: None, + announce_list: None, + creation_date: None, + comment: None, + created_by: None, + }; + + // Mock a valid piece + let piece = vec![0; 1024]; + + let result = torrent.check_piece(&piece, 0); + + assert!(result); + } + + #[test] + fn check_piece_invalid() { + // Create a mock Torrent instance + let torrent = Torrent { + info: Info { + name: String::from("test_torrent"), + pieces: vec![0; 20], // Mock piece hashes + piece_length: 1024, + length: Some(2048), + files: None, + md5sum: None, + private: None, + path: None, + root_hash: None, + }, + announce: Some(String::from("http://tracker.example.com/announce")), + nodes: None, + encoding: None, + httpseeds: None, + announce_list: None, + creation_date: None, + comment: None, + created_by: None, + }; + + // Mock an invalid piece + let piece = vec![1; 1024]; + + let result = torrent.check_piece(&piece, 0); + + assert!(!result); + } + + #[test] + fn get_total_length_single_file() { + // Create a mock Torrent instance with a single file + let torrent = Torrent { + info: Info { + name: String::from("test_torrent"), + pieces: vec![], + piece_length: 1024, + length: Some(2048), + files: Some(vec![File { + path: vec![String::from("test_file.txt")], + length: 2048, + md5sum: None, + }]), + md5sum: None, + private: None, + path: None, + root_hash: None, + }, + announce: Some(String::from("http://tracker.example.com/announce")), + nodes: None, + encoding: None, + httpseeds: None, + announce_list: None, + creation_date: None, + comment: None, + created_by: None, + }; + + let result = torrent.get_total_length(); + + assert_eq!(result, 2048); + } + + #[test] + fn get_total_length_multiple_files() { + // Create a mock Torrent instance with multiple files + let torrent = Torrent { + info: Info { + name: String::from("test_torrent"), + pieces: vec![], + piece_length: 1024, + length: None, + files: Some(vec![ + File { + path: vec![String::from("file1.txt")], + length: 1024, + md5sum: None, + }, + File { + path: vec![String::from("file2.txt")], + length: 2048, + md5sum: None, + }, + ]), + md5sum: None, + private: None, + path: None, + root_hash: None, + }, + announce: Some(String::from("http://tracker.example.com/announce")), + nodes: None, + encoding: None, + httpseeds: None, + announce_list: None, + creation_date: None, + comment: None, + created_by: None, + }; + + let result = torrent.get_total_length(); + + assert_eq!(result, 3072); + } + + // Add more tests for other methods and edge cases as needed +} \ No newline at end of file diff --git a/src/tracker/tracker.rs b/lib_rusty_torrent/src/tracker.rs similarity index 95% rename from src/tracker/tracker.rs rename to lib_rusty_torrent/src/tracker.rs index 90e7c0c..3699eec 100644 --- a/src/tracker/tracker.rs +++ b/lib_rusty_torrent/src/tracker.rs @@ -1,7 +1,6 @@ use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; use tokio::net::UdpSocket; -use log::{debug, error}; use crate::torrent::Torrent; @@ -26,22 +25,16 @@ impl Tracker { /// # Panics /// /// Panics if there is an error parsing the given address or creating the UDP socket. - pub async fn new(listen_address: SocketAddr, remote_address: SocketAddr) -> Result { + pub async fn new(listen_address: SocketAddr, remote_address: SocketAddr) -> Result { let Ok(connection_stream) = UdpSocket::bind(listen_address).await else { - error!("error binding to udpsocket {listen_address}"); - return Err(()) + return Err(format!("error binding to udpsocket {listen_address}")) }; - - debug!("bound udpsocket successfully to: {listen_address}"); match connection_stream.connect(remote_address).await { Err(err) => { - error!("unable to connect to {}, err: {}", remote_address, err); - panic!("error creating udpsocket, {}", err); + return Err(format!("error creating udpsocket, {}", err)); }, - Ok(()) => { - debug!("successfully connected to: {remote_address}"); - } + Ok(()) => { } }; diff --git a/lib_rusty_torrent/test.torrent b/lib_rusty_torrent/test.torrent new file mode 100644 index 0000000..cd1df57 Binary files /dev/null and b/lib_rusty_torrent/test.torrent differ diff --git a/rusty_torrenter/.gitignore b/rusty_torrenter/.gitignore new file mode 100644 index 0000000..317483d --- /dev/null +++ b/rusty_torrenter/.gitignore @@ -0,0 +1,8 @@ +/target +Cargo.lock +/log/rusty_torrent.log +/testing +/process +/log +.DS_Store +/downloads \ No newline at end of file diff --git a/rusty_torrenter/Cargo.toml b/rusty_torrenter/Cargo.toml new file mode 100644 index 0000000..3edba30 --- /dev/null +++ b/rusty_torrenter/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "rusty_torrenter" +version = "0.9.3" +edition = "2021" + +description = "A BitTorrent client implemented in Rust that allows you to interact with the BitTorrent protocol and download torrents." + +authors = ["Arlo Filley "] +exclude = ["testing/", "process/", ".vscode/", ".DS_STORE"] +license = "MIT" +keywords = ["bittorrent", "torrent", "torrentclient"] +readme = "README.md" +repository = "https://github.com/arlofilley/rusty_torrent" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +lib_rusty_torrent = { path = "../lib_rusty_torrent" } +dns-lookup = "2.0.2" +log = "0.4.20" +regex = "1.9.4" +reqwest = "0.11.20" +serde = { version = "1.0.183", features = ["derive"] } +serde_bencode = "0.2.3" +serde_bytes = "0.11.12" +sha1 = "0.10.5" +simple-logging = "2.0.2" +tokio = { workspace = true } +clap = { version = "*", features = ["derive"] } diff --git a/LICENSE b/rusty_torrenter/LICENSE similarity index 100% rename from LICENSE rename to rusty_torrenter/LICENSE diff --git a/readme.md b/rusty_torrenter/README.md similarity index 100% rename from readme.md rename to rusty_torrenter/README.md diff --git a/src/main.rs b/rusty_torrenter/src/main.rs similarity index 75% rename from src/main.rs rename to rusty_torrenter/src/main.rs index b3afe23..c027230 100644 --- a/src/main.rs +++ b/rusty_torrenter/src/main.rs @@ -9,30 +9,20 @@ //! Checks piece hashes //! Writes to torrent file -// Modules -mod files; -mod handshake; -mod peer; -mod message; -mod torrent; -mod tracker; - use core::panic; use std::net::SocketAddr; // Crate Imports -use crate::{ +use lib_rusty_torrent::{ files::Files, - peer::Peer, + peer::*, torrent::Torrent, - tracker::tracker::Tracker + tracker::Tracker, }; -use tokio::sync::mpsc; // External Ipmorts use clap::Parser; use log::{ debug, info, LevelFilter, error }; -use tokio::spawn; /// Struct Respresenting needed arguments #[derive(Parser, Debug)] @@ -64,8 +54,7 @@ async fn main() { info!("==> WELCOME TO RUSTY-TORRENT <=="); // Read the Torrent File - let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; - torrent.log_useful_information(); + let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await.unwrap(); // Create the files that will be written to let mut files = Files::new(); @@ -102,28 +91,25 @@ async fn main() { let mut i = 0; let t = torrent.clone(); - let (sender, mut reciever) = Peer::test(peers[0], torrent).await; - loop { - let _ = sender.send(peer::ControlMessage::DownloadPiece(i, t.info.piece_length as u32, len, t.get_total_length() as u32)); - reciever.resubscribe(); - let a = reciever.recv().await.unwrap(); + let _ = sender.send(peer::ControlMessage::DownloadPiece(i, t.info.piece_length as u32, len, t.get_total_length() as u32)); + + let a = reciever.recv().await.unwrap(); - println!("2 {a:?}"); + println!("2 {a:?}"); - let peer::ControlMessage::DownloadedPiece(b) = a else { - continue; - }; + let peer::ControlMessage::DownloadedPiece(b) = a else { + continue; + }; - if t.check_piece(&b, i) { - files.write_piece(b).await; - } else { - break - } + if t.check_piece(&b, i) { + files.write_piece(b).await; + } else { + break } - //peer.disconnect().await; + peer.disconnect().await; info!("Successfully completed download"); diff --git a/src/handshake.rs b/src/handshake.rs deleted file mode 100644 index ab62671..0000000 --- a/src/handshake.rs +++ /dev/null @@ -1,105 +0,0 @@ -use log::{ info, error }; - -/// Represents the handshake message that will be sent to a client. -#[derive(Debug)] -pub struct Handshake { - /// The length of the protocol name, must be 19 for "BitTorrent protocol". - p_str_len: u8, - /// The protocol name, should always be "BitTorrent protocol". - p_str: String, - /// Reserved for extensions, currently unused. - reserved: [u8; 8], - /// The infohash for the torrent. - info_hash: Vec, - /// The identifier for the client. - pub peer_id: String, -} - -impl Handshake { - /// Creates a new handshake. - /// - /// # Arguments - /// - /// * `info_hash` - The infohash for the torrent. - /// - /// # Returns - /// - /// A new `Handshake` instance on success, or an empty `Result` indicating an error. - pub fn new(info_hash: &[u8]) -> Option { - if info_hash.len() != 20 { - error!("Incorrect infohash length, consider using the helper function in torrent"); - return None; - } - - Some(Self { - p_str_len: 19, - p_str: String::from("BitTorrent protocol"), - reserved: [0; 8], - info_hash: info_hash.to_vec(), - peer_id: String::from("-MY0001-123456654322") - }) - } - - /// Converts the `Handshake` instance to a byte buffer for sending to a peer. - /// - /// # Returns - /// - /// A byte vector containing the serialized handshake. - pub fn to_buffer(&self) -> Vec { - let mut buf: Vec = vec![0; 68]; - - buf[0] = self.p_str_len; - buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); - buf[21..28].copy_from_slice(&self.reserved[..7]); - buf[28..48].copy_from_slice(&self.info_hash[..20]); - buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); - - buf - } - - /// Converts a byte buffer to a `Handshake` instance. - /// - /// # Arguments - /// - /// * `buf` - A byte vector containing the serialized handshake. - /// - /// # Returns - /// - /// A new `Handshake` instance on success, or an empty `Result` indicating an error. - /// - /// # Errors - /// - /// Returns an error if the provided buffer is not long enough (at least 68 bytes). - pub fn from_buffer(buf: &Vec) -> Option { - // Verify that buffer is at least the correct size, if not error - if buf.len() < 68 { - error!("buffer provided to handshake was too short"); - return None; - } - - let mut p_str = String::new(); - for byte in buf.iter().take(20).skip(1) { - p_str.push(*byte as char) - } - - let mut info_hash: Vec = vec![0; 20]; - info_hash[..20].copy_from_slice(&buf[28..48]); - - let mut peer_id = String::new(); - for byte in buf.iter().take(68).skip(48) { - peer_id.push(*byte as char) - } - - Some(Self { - p_str_len: buf[0], - p_str, - reserved: [0; 8], - info_hash, - peer_id - }) - } - - pub fn log_useful_information(&self) { - info!("Connected - PeerId: {:?}", self.peer_id); - } -} \ No newline at end of file diff --git a/src/message.rs b/src/message.rs deleted file mode 100644 index a36b5d1..0000000 --- a/src/message.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::vec; -use log::{error, debug}; - -/// Represents a message in the BitTorrent protocol. -#[derive(Debug, PartialEq)] -pub struct Message { - /// The length of the message, including the type and payload. - pub message_length: u32, - /// The type of message. - pub message_type: MessageType, - /// The payload of the message, if any. - pub payload: Option>, -} - -pub trait ToBuffer { - fn to_buffer(self) -> Vec; -} - -pub trait FromBuffer { - fn from_buffer(buf: &[u8]) -> Self; -} - -impl Message { - /// Creates a new message. - /// - /// # Arguments - /// - /// * `message_length` - The length of the message. - /// * `message_type` - The type of message. - /// * `payload` - The payload of the message, if any. - pub fn new(message_length: u32, message_type: MessageType, payload: Option>) -> Self { - Self { message_length, message_type, payload } - } -} - -impl FromBuffer for Message { - /// Decodes a message from a given buffer. - /// - /// # Arguments - /// - /// * `buf` - The byte buffer containing the serialized message. - /// - /// # Returns - /// - /// A new `Message` instance on success, or an empty `Result` indicating an error. - fn from_buffer(buf: &[u8]) -> Self { - let mut message_length: [u8; 4] = [0; 4]; - message_length[..4].copy_from_slice(&buf[..4]); - - let message_length = u32::from_be_bytes(message_length); - - let mut payload: Option> = None; - let message_type: MessageType; - - if message_length == 0 { - message_type = MessageType::KeepAlive; - payload = None; - } else if message_length == 5 { - message_type = buf[4].into(); - payload = None; - } else { - message_type = buf[4].into(); - - let end_of_message = 4 + message_length as usize; - - if end_of_message > buf.len() { - error!("index too long"); - debug!("{buf:?}"); - } else { - payload = Some(buf[5..end_of_message].to_vec()); - } - } - - Self { - message_length, - message_type, - payload - } - } -} - - -impl ToBuffer for Message { - /// Converts the `Message` instance to a byte buffer for sending. - /// - /// # Returns - /// - /// A byte vector containing the serialized message. - fn to_buffer(self) -> Vec { - let mut buf: Vec = vec![]; - - for byte in self.message_length.to_be_bytes() { - buf.push(byte); - } - - match self.message_type { - MessageType::KeepAlive => { - return buf - }, - MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => { - buf.push(self.message_type.into()); - return buf; - }, - MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => { - buf.push(self.message_type.into()); - }, - MessageType::Error => { - panic!("Error making message into buffer") - } - } - - match self.payload { - None => { panic!("Error you are trying to create a message that needs a payload with no payload") } - Some(payload) => { - buf.extend(payload); - } - } - - buf - } -} - -impl Message { - /// Create a request message from a given piece_index, offset, and length - /// - /// # Arguments - /// - /// * `piece_index` - The index of the piece in the torrent - /// * `offset` - The offset within the piece, because requests should be no more than 16KiB - /// * `length` - The length of the piece request, should be 16KiB - /// - /// # Returns - /// - /// A piece request message - pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self { - let mut payload: Vec = vec![]; - - for byte in piece_index.to_be_bytes() { - payload.push(byte); - } - - for byte in offset.to_be_bytes() { - payload.push(byte) - } - - for byte in length.to_be_bytes() { - payload.push(byte) - } - - Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) } - } - - /// Returns the number of messages in the given buffer and their contents. - /// - /// # Arguments - /// - /// * `buf` - The byte buffer containing multiple serialized messages. - /// - /// # Returns - /// - /// A tuple containing a vector of message byte buffers and the number of messages. - pub fn number_of_messages(buf: &[u8]) -> (Vec>, u32) { - let mut message_num = 0; - let mut messages: Vec> = vec![]; - - // Find the length of message one - // put that into an array and increment counter by one - let mut i = 0; // points to the front - let mut j; // points to the back - - loop { - j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4; - - messages.push(buf[i..i+j].to_vec()); - i += j; - message_num += 1; - - if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 { - break; - } - } - - (messages, message_num) - } -} - -/// An enum representing all possible message types in the BitTorrent peer wire protocol. -#[derive(Clone, Debug, PartialEq)] -#[repr(u8)] -pub enum MessageType { - /// Keepalive message, 0 length. - /// Potential Errors if trying to handle a keepalive message like another message. - /// Due to length being 0, should always be explicitly handled. - KeepAlive = u8::MAX, - /// Message telling the client not to send any requests until the peer has unchoked, 1 length. - Choke = 0, - /// Message telling the client that it can send requests, 1 length. - Unchoke = 1, - /// Message indicating that the peer is still interested in downloading, 1 length. - Interested = 2, - /// Message indicating that the peer is not interested in downloading, 1 length. - NotInterested = 3, - /// Message indicating that the peer has a given piece, fixed length. - Have = 4, - /// Message sent after a handshake, represents the pieces that the peer has. - Bitfield = 5, - /// Request a given part of a piece based on index, offset, and length, 13 length. - Request = 6, - /// A response to a request with the accompanying data, varying length. - Piece = 7, - /// Cancels a request, 13 length. - Cancel = 8, - /// Placeholder for unimplemented message type. - Port = 9, - Error -} - -impl From for MessageType { - fn from(val: u8) -> MessageType { - match val { - 0 => MessageType::Choke, - 1 => MessageType::Unchoke, - 2 => MessageType::Interested, - 3 => MessageType::NotInterested, - 4 => MessageType::Have, - 5 => MessageType::Bitfield, - 6 => MessageType::Request, - 7 => MessageType::Piece, - 8 => MessageType::Cancel, - 9 => MessageType::Port, - _ => { - error!("Invalid Message Type: {}", val); - MessageType::Error - } - } - } -} - -impl From for u8 { - fn from(val: MessageType) -> u8 { - match val { - MessageType::Choke => 0, - MessageType::Unchoke => 1, - MessageType::Interested => 2, - MessageType::NotInterested => 3, - MessageType::Have => 4, - MessageType::Bitfield => 5, - MessageType::Request => 6, - MessageType::Piece => 7, - MessageType::Cancel => 8, - MessageType::Port => 9, - _ => { - error!("Invalid Message Type: {:?}", val); - u8::MAX - } - } - } -} \ No newline at end of file diff --git a/src/torrent.rs b/src/torrent.rs deleted file mode 100644 index a4a03e3..0000000 --- a/src/torrent.rs +++ /dev/null @@ -1,249 +0,0 @@ -use log::{debug, info, error, warn, trace}; -use regex::Regex; -use serde::{Deserialize, Serialize}; -use sha1::{Digest, Sha1}; -use tokio::{fs::File as TokioFile, io::AsyncReadExt}; -use std::net::{IpAddr, SocketAddrV4}; - -/// Represents a node in a DHT network. -#[derive(Clone, Debug, Deserialize, Serialize)] -struct Node(String, i64); - -/// Represents a file described in a torrent. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct File { - pub path: Vec, - pub length: u64, - #[serde(default)] - md5sum: Option, -} - -/// Represents the metadata of a torrent. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Info { - pub name: String, - #[serde(with = "serde_bytes")] - pub pieces: Vec, - #[serde(rename = "piece length")] - pub piece_length: u64, - #[serde(default)] - md5sum: Option, - #[serde(default)] - pub length: Option, - #[serde(default)] - pub files: Option>, - #[serde(default)] - private: Option, - #[serde(default)] - path: Option>, - #[serde(default)] - #[serde(rename = "root hash")] - root_hash: Option, -} - -/// Represents a torrent. -#[derive(Clone, Debug, Deserialize, Serialize)] -pub struct Torrent { - pub info: Info, - #[serde(default)] - pub announce: Option, - #[serde(default)] - nodes: Option>, - #[serde(default)] - encoding: Option, - #[serde(default)] - httpseeds: Option>, - #[serde(default)] - #[serde(rename = "announce-list")] - announce_list: Option>>, - #[serde(default)] - #[serde(rename = "creation date")] - creation_date: Option, - #[serde(rename = "comment")] - comment: Option, - #[serde(default)] - #[serde(rename = "created by")] - created_by: Option -} - -impl Torrent { - /// Reads a `.torrent` file and converts it into a `Torrent` struct. - /// - /// # Arguments - /// - /// * `path` - The path to the `.torrent` file. - pub async fn from_torrent_file(path: &str) -> Self { - info!(""); - info!("--> Reading File <--"); - - let Ok(mut file) = TokioFile::open(path).await else { - error!("Unable to read file at {path}"); - panic!("Unable to read file at {path}"); - }; - info!("Found\t\t > {path}"); - - let mut buf: Vec = Vec::new(); - let Ok(_) = file.read_to_end(&mut buf).await else { - error!("Error reading file > {path}"); - panic!("Error reading file > {path}"); - }; - info!("Read\t\t > {path}"); - - let Ok(torrent) = serde_bencode::from_bytes(&buf) else { - error!("Error deserializing file > {path}"); - panic!("Error deserializing file > {path}"); - }; - info!("Parsed\t > {path}"); - - torrent - } -} - -impl Torrent { - /// Logs info about the *.torrent file - pub fn log_useful_information(&self) { - info!(""); - info!("--> Torrent Information <--"); - info!("Name:\t\t{}", self.info.name); - info!("Trackers"); - if let Some(trackers) = &self.announce_list { - for tracker in trackers { - info!(" |> {}", tracker[0]) - } - } - info!("InfoHash:\t{:X?}", self.get_info_hash()); - info!("Length:\t{:?}", self.info.length); - - info!("Files:"); - let Some(mut files) = self.info.files.clone() else { - info!("./{}", self.info.name); - return - }; - - files.sort_by(|a, b| a.path.len().cmp(&b.path.len()) ); - info!("./"); - for file in files { - if file.path.len() == 1 { - info!(" |--> {:?}", file.path); - } else { - let mut path = String::new(); - file.path.iter().for_each(|s| { path.push_str(s); path.push('/') } ); - path.pop(); - - info!(" |--> {}: {}B", path, file.length) - } - } - } - - /// Calculates the info hash of the torrent. - pub fn get_info_hash(&self) -> Vec { - let buf = serde_bencode::to_bytes(&self.info).unwrap(); - - let mut hasher = Sha1::new(); - hasher.update(buf); - let res = hasher.finalize(); - res[..].to_vec() - } - - /// Checks if a downloaded piece matches its hash. - /// - /// # Arguments - /// - /// * `piece` - The downloaded piece. - /// * `index` - The index of the piece. - /// - /// # Returns - /// - /// * `true` if the piece is correct, `false` otherwise. - pub fn check_piece(&self, piece: &[u8], index: u32) -> bool { - let mut hasher = Sha1::new(); - hasher.update(piece); - let result = hasher.finalize(); - - let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; - - if &result[..] == piece_hash { - info!("Piece {}/{} Correct!", index + 1, self.info.pieces.len() / 20); - true - } else { - error!("Piece {}/{} incorrect :(", index + 1, self.info.pieces.len() / 20); - debug!("{:?}", &result[..]); - debug!("{:?}", piece_hash); - debug!("{:?}", &result[..].len()); - debug!("{:?}", piece_hash.len()); - debug!("{}", piece.len()); - false - } - } - - pub fn get_total_length(&self) -> u64 { - if let Some(n) = self.info.length { - return n as u64 - }; - - if let Some(files) = &self.info.files { - let mut n = 0; - - for file in files { - n += file.length; - }; - - return n - }; - - 0 - } - - pub fn get_trackers(&self) -> Option> { - info!(""); - info!("--> Locating Trackers <--"); - - let mut addresses = vec![]; - - // This is the current regex as I haven't implemented support for http trackers yet - let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); - - if let Some(url) = &self.announce { - if let Some(captures) = re.captures(url) { - let hostname = captures.get(1).unwrap().as_str(); - let port = captures.get(2).unwrap().as_str(); - - if let Ok(ip) = dns_lookup::lookup_host(hostname) { - for i in ip { - if let IpAddr::V4(j) = i { - addresses.push(SocketAddrV4::new(j, port.parse().unwrap())) - } - } - } - } else { - warn!("{url} does not match the expected url pattern"); - } - } - - if let Some(urls) = &self.announce_list { - for url in urls.iter() { - if let Some(captures) = re.captures(&url[0]) { - let hostname = captures.get(1).unwrap().as_str(); - let port = captures.get(2).unwrap().as_str(); - - if let Ok(ip) = dns_lookup::lookup_host(hostname) { - for i in ip { - if let IpAddr::V4(j) = i { - addresses.push(SocketAddrV4::new(j, port.parse().unwrap())); - } - } - info!("Sucessfully found tracker {}", url[0]); - } - } else { - warn!("{} does not match the expected url pattern", url[0]); - } - } - } - - if addresses.len() > 0 { - Some(addresses) - } else { - None - } - } -} \ No newline at end of file diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs deleted file mode 100644 index d23c619..0000000 --- a/src/tracker/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod tracker; \ No newline at end of file