changed indentation

This commit is contained in:
Arlo Filley 2023-08-27 22:01:23 +01:00
parent 7fab25ea9f
commit 60dcd93377
7 changed files with 1086 additions and 1067 deletions

View File

@ -1,9 +1,9 @@
use log::debug; use log::debug;
use tokio::{ use tokio::{
fs::try_exists as dir_exists, fs::try_exists as dir_exists,
fs::create_dir as create_dir, fs::create_dir as create_dir,
fs::File, fs::File,
io::AsyncWriteExt io::AsyncWriteExt
}; };
use crate::torrent::Torrent; use crate::torrent::Torrent;
@ -11,11 +11,11 @@ use crate::torrent::Torrent;
/// Represents information about a file being downloaded. /// Represents information about a file being downloaded.
#[derive(Debug)] #[derive(Debug)]
struct FileInfo { struct FileInfo {
file: File, file: File,
length: u64, length: u64,
current_length: u64, current_length: u64,
name: String, name: String,
complete: bool complete: bool
} }
/// Represents a collection of files being downloaded. /// Represents a collection of files being downloaded.
@ -23,86 +23,86 @@ struct FileInfo {
pub struct Files(Vec<FileInfo>); pub struct Files(Vec<FileInfo>);
impl Files { impl Files {
/// Creates a new `Files` instance. /// Creates a new `Files` instance.
pub fn new() -> Self { pub fn new() -> Self {
Self(vec![]) Self(vec![])
} }
/// Creates the files in the local system for downloading. /// Creates the files in the local system for downloading.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `torrent` - The `Torrent` instance describing the torrent. /// * `torrent` - The `Torrent` instance describing the torrent.
/// * `download_path` - The path where the files will be downloaded. /// * `download_path` - The path where the files will be downloaded.
pub async fn create_files(&mut self, torrent: &Torrent, download_path: &str) { pub async fn create_files(&mut self, torrent: &Torrent, download_path: &str) {
match &torrent.info.files { match &torrent.info.files {
// Single File Mode // Single File Mode
None => { None => {
let path = &format!("{download_path}/{}", torrent.info.name); let path = &format!("{download_path}/{}", torrent.info.name);
let file = File::create(&path).await.unwrap(); let file = File::create(&path).await.unwrap();
let length = torrent.info.length.unwrap_or(0) as u64; let length = torrent.info.length.unwrap_or(0) as u64;
self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false }) self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false })
} }
// Multi File Mode // Multi File Mode
Some(files) => { Some(files) => {
for t_file in files { for t_file in files {
let mut path = download_path.to_string(); let mut path = download_path.to_string();
for dir in &t_file.path[..t_file.path.len() - 1] { for dir in &t_file.path[..t_file.path.len() - 1] {
path.push('/'); path.push('/');
path.push_str(dir); path.push_str(dir);
if !dir_exists(&path).await.unwrap() { if !dir_exists(&path).await.unwrap() {
debug!("Creating: {path}"); debug!("Creating: {path}");
create_dir(&path).await.unwrap(); create_dir(&path).await.unwrap();
}
}
path.push('/');
path.push_str(&t_file.path[t_file.path.len() - 1]);
debug!("Creating: {path}");
let file = File::create(&path).await.unwrap();
let length = t_file.length;
self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false });
}
} }
}
path.push('/');
path.push_str(&t_file.path[t_file.path.len() - 1]);
debug!("Creating: {path}");
let file = File::create(&path).await.unwrap();
let length = t_file.length;
self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false });
} }
}
} }
}
/// Writes a piece of data to the appropriate files.
/// /// Writes a piece of data to the appropriate files.
/// # Arguments ///
/// /// # Arguments
/// * `piece` - The piece of data to write. ///
pub async fn write_piece(&mut self, piece: Vec<u8>) { /// * `piece` - The piece of data to write.
let mut j = 0; pub async fn write_piece(&mut self, piece: Vec<u8>) {
let mut j = 0;
let mut piece_len = piece.len() as u64;
let file_iterator = self.0.iter_mut(); let mut piece_len = piece.len() as u64;
let file_iterator = self.0.iter_mut();
for file in file_iterator {
for file in file_iterator {
if file.complete { continue }
if file.complete { continue }
if file.current_length + piece_len > file.length {
let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap(); if file.current_length + piece_len > file.length {
debug!("Wrote {n}B > {}", file.name); let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap();
j = (file.length - file.current_length) as usize; debug!("Wrote {n}B > {}", file.name);
file.current_length += j as u64; j = (file.length - file.current_length) as usize;
piece_len -= j as u64; file.current_length += j as u64;
file.complete = true; piece_len -= j as u64;
} else { file.complete = true;
let n = file.file.write(&piece[j..]).await.unwrap(); } else {
debug!("Wrote {n}B > {}", file.name); let n = file.file.write(&piece[j..]).await.unwrap();
file.current_length += piece_len; debug!("Wrote {n}B > {}", file.name);
return file.current_length += piece_len;
} return
} }
} }
}
} }

View File

@ -3,103 +3,103 @@ use log::{ info, error };
/// Represents the handshake message that will be sent to a client. /// Represents the handshake message that will be sent to a client.
#[derive(Debug)] #[derive(Debug)]
pub struct Handshake { pub struct Handshake {
/// The length of the protocol name, must be 19 for "BitTorrent protocol". /// The length of the protocol name, must be 19 for "BitTorrent protocol".
p_str_len: u8, p_str_len: u8,
/// The protocol name, should always be "BitTorrent protocol". /// The protocol name, should always be "BitTorrent protocol".
p_str: String, p_str: String,
/// Reserved for extensions, currently unused. /// Reserved for extensions, currently unused.
reserved: [u8; 8], reserved: [u8; 8],
/// The infohash for the torrent. /// The infohash for the torrent.
info_hash: Vec<u8>, info_hash: Vec<u8>,
/// The identifier for the client. /// The identifier for the client.
pub peer_id: String, pub peer_id: String,
} }
impl Handshake { impl Handshake {
/// Creates a new handshake. /// Creates a new handshake.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `info_hash` - The infohash for the torrent. /// * `info_hash` - The infohash for the torrent.
/// ///
/// # Returns /// # Returns
/// ///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error. /// A new `Handshake` instance on success, or an empty `Result` indicating an error.
pub fn new(info_hash: &[u8]) -> Option<Self> { pub fn new(info_hash: &[u8]) -> Option<Self> {
if info_hash.len() != 20 { if info_hash.len() != 20 {
error!("Incorrect infohash length, consider using the helper function in torrent"); error!("Incorrect infohash length, consider using the helper function in torrent");
return None; return None;
}
Some(Self {
p_str_len: 19,
p_str: String::from("BitTorrent protocol"),
reserved: [0; 8],
info_hash: info_hash.to_vec(),
peer_id: String::from("-MY0001-123456654322")
})
} }
/// Converts the `Handshake` instance to a byte buffer for sending to a peer. Some(Self {
/// p_str_len: 19,
/// # Returns p_str: String::from("BitTorrent protocol"),
/// reserved: [0; 8],
/// A byte vector containing the serialized handshake. info_hash: info_hash.to_vec(),
pub fn to_buffer(&self) -> Vec<u8> { peer_id: String::from("-MY0001-123456654322")
let mut buf: Vec<u8> = vec![0; 68]; })
}
buf[0] = self.p_str_len;
buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]); /// Converts the `Handshake` instance to a byte buffer for sending to a peer.
buf[21..28].copy_from_slice(&self.reserved[..7]); ///
buf[28..48].copy_from_slice(&self.info_hash[..20]); /// # Returns
buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]); ///
/// A byte vector containing the serialized handshake.
buf pub fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![0; 68];
buf[0] = self.p_str_len;
buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]);
buf[21..28].copy_from_slice(&self.reserved[..7]);
buf[28..48].copy_from_slice(&self.info_hash[..20]);
buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]);
buf
}
/// Converts a byte buffer to a `Handshake` instance.
///
/// # Arguments
///
/// * `buf` - A byte vector containing the serialized handshake.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
///
/// # Errors
///
/// Returns an error if the provided buffer is not long enough (at least 68 bytes).
pub fn from_buffer(buf: &Vec<u8>) -> Option<Self> {
// Verify that buffer is at least the correct size, if not error
if buf.len() < 68 {
error!("buffer provided to handshake was too short");
return None;
} }
/// Converts a byte buffer to a `Handshake` instance. let mut p_str = String::new();
/// for byte in buf.iter().take(20).skip(1) {
/// # Arguments p_str.push(*byte as char)
///
/// * `buf` - A byte vector containing the serialized handshake.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
///
/// # Errors
///
/// Returns an error if the provided buffer is not long enough (at least 68 bytes).
pub fn from_buffer(buf: &Vec<u8>) -> Option<Self> {
// Verify that buffer is at least the correct size, if not error
if buf.len() < 68 {
error!("buffer provided to handshake was too short");
return None;
}
let mut p_str = String::new();
for byte in buf.iter().take(20).skip(1) {
p_str.push(*byte as char)
}
let mut info_hash: Vec<u8> = vec![0; 20];
info_hash[..20].copy_from_slice(&buf[28..48]);
let mut peer_id = String::new();
for byte in buf.iter().take(68).skip(48) {
peer_id.push(*byte as char)
}
Some(Self {
p_str_len: buf[0],
p_str,
reserved: [0; 8],
info_hash,
peer_id
})
} }
pub fn log_useful_information(&self) { let mut info_hash: Vec<u8> = vec![0; 20];
info!("Connected - PeerId: {:?}", self.peer_id); info_hash[..20].copy_from_slice(&buf[28..48]);
let mut peer_id = String::new();
for byte in buf.iter().take(68).skip(48) {
peer_id.push(*byte as char)
} }
Some(Self {
p_str_len: buf[0],
p_str,
reserved: [0; 8],
info_hash,
peer_id
})
}
pub fn log_useful_information(&self) {
info!("Connected - PeerId: {:?}", self.peer_id);
}
} }

View File

@ -19,15 +19,15 @@ mod tracker;
// Crate Imports // Crate Imports
use crate::{ use crate::{
files::Files, files::Files,
peer::Peer, peer::Peer,
torrent::Torrent, torrent::Torrent,
tracker::{ tracker::{
AnnounceMessage, AnnounceMessageResponse, AnnounceMessage, AnnounceMessageResponse,
ConnectionMessage, ConnectionMessage,
FromBuffer, FromBuffer,
Tracker Tracker
} }
}; };
// External Ipmorts // External Ipmorts
@ -38,86 +38,86 @@ use log::{ debug, info, LevelFilter };
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Args {
#[arg(short, long)] #[arg(short, long)]
log_file_path: Option<String>, log_file_path: Option<String>,
#[arg(short, long)] #[arg(short, long)]
torrent_file_path: String, torrent_file_path: String,
#[arg(short, long)] #[arg(short, long)]
download_path: String, download_path: String,
} }
/// The root function /// The root function
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let args = Args::parse(); let args = Args::parse();
// Creates a log file to handle large amounts of data // Creates a log file to handle large amounts of data
let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log")); let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log"));
simple_logging::log_to_file(&log_path, LevelFilter::Info).unwrap(); simple_logging::log_to_file(&log_path, LevelFilter::Info).unwrap();
// Read the Torrent File // Read the Torrent File
let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await;
info!("Sucessfully read torrent file"); info!("Sucessfully read torrent file");
torrent.log_useful_information(); torrent.log_useful_information();
// Create the files that will be written to // Create the files that will be written to
let mut files = Files::new(); let mut files = Files::new();
files.create_files(&torrent, &args.download_path).await; files.create_files(&torrent, &args.download_path).await;
// Gets peers from the given tracker // Gets peers from the given tracker
let (_remote_hostname, _remote_port) = torrent.get_tracker(); let (_remote_hostname, _remote_port) = torrent.get_tracker();
let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337); let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337);
debug!("{}:{}", remote_hostname, remote_port); debug!("{}:{}", remote_hostname, remote_port);
let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await; let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await;
info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port); info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port);
let connection_message = ConnectionMessage::from_buffer( let connection_message = ConnectionMessage::from_buffer(
&tracker.send_message(&ConnectionMessage::create_basic_connection()).await &tracker.send_message(&ConnectionMessage::create_basic_connection()).await
); );
debug!("{:?}", connection_message); debug!("{:?}", connection_message);
let announce_message_response = AnnounceMessageResponse::from_buffer( let announce_message_response = AnnounceMessageResponse::from_buffer(
&tracker.send_message(&AnnounceMessage::new( &tracker.send_message(&AnnounceMessage::new(
connection_message.connection_id, connection_message.connection_id,
&torrent.get_info_hash(), &torrent.get_info_hash(),
"-MY0001-123456654321", "-MY0001-123456654321",
torrent.get_total_length() as i64 torrent.get_total_length() as i64
)).await )).await
); );
debug!("{:?}", announce_message_response); debug!("{:?}", announce_message_response);
info!("Found Peers"); info!("Found Peers");
// Creates an assumed peer connection to the `SocketAddr` given
let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await {
None => { return },
Some(peer) => peer
};
let num_pieces = torrent.info.pieces.len() / 20;
peer.handshake(&torrent).await;
peer.keep_alive_until_unchoke().await;
info!("Successfully Created Connection with peer: {}", peer.peer_id);
let mut len = 0;
for index in 0..num_pieces {
let piece= peer.request_piece(
index as u32, torrent.info.piece_length as u32,
&mut len, torrent.get_total_length() as u32
).await;
// Creates an assumed peer connection to the `SocketAddr` given if torrent.check_piece(&piece, index as u32) {
let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await { files.write_piece(piece).await;
None => { return }, } else {
Some(peer) => peer break
};
let num_pieces = torrent.info.pieces.len() / 20;
peer.handshake(&torrent).await;
peer.keep_alive_until_unchoke().await;
info!("Successfully Created Connection with peer: {}", peer.peer_id);
let mut len = 0;
for index in 0..num_pieces {
let piece= peer.request_piece(
index as u32, torrent.info.piece_length as u32,
&mut len, torrent.get_total_length() as u32
).await;
if torrent.check_piece(&piece, index as u32) {
files.write_piece(piece).await;
} else {
break
}
} }
}
peer.disconnect().await;
info!("Successfully completed download"); peer.disconnect().await;
info!("Successfully completed download");
} }

View File

@ -1,252 +1,258 @@
use std::vec; use std::vec;
use log::error; use log::{error, debug};
/// Represents a message in the BitTorrent protocol. /// Represents a message in the BitTorrent protocol.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub struct Message { pub struct Message {
/// The length of the message, including the type and payload. /// The length of the message, including the type and payload.
pub message_length: u32, pub message_length: u32,
/// The type of message. /// The type of message.
pub message_type: MessageType, pub message_type: MessageType,
/// The payload of the message, if any. /// The payload of the message, if any.
pub payload: Option<Vec<u8>>, pub payload: Option<Vec<u8>>,
} }
pub trait ToBuffer { pub trait ToBuffer {
fn to_buffer(self) -> Vec<u8>; fn to_buffer(self) -> Vec<u8>;
} }
pub trait FromBuffer { pub trait FromBuffer {
fn from_buffer(buf: &[u8]) -> Self; fn from_buffer(buf: &[u8]) -> Self;
} }
impl Message { impl Message {
/// Creates a new message. /// Creates a new message.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `message_length` - The length of the message. /// * `message_length` - The length of the message.
/// * `message_type` - The type of message. /// * `message_type` - The type of message.
/// * `payload` - The payload of the message, if any. /// * `payload` - The payload of the message, if any.
pub fn new(message_length: u32, message_type: MessageType, payload: Option<Vec<u8>>) -> Self { pub fn new(message_length: u32, message_type: MessageType, payload: Option<Vec<u8>>) -> Self {
Self { message_length, message_type, payload } Self { message_length, message_type, payload }
} }
} }
impl FromBuffer for Message { impl FromBuffer for Message {
/// Decodes a message from a given buffer. /// Decodes a message from a given buffer.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `buf` - The byte buffer containing the serialized message. /// * `buf` - The byte buffer containing the serialized message.
/// ///
/// # Returns /// # Returns
/// ///
/// A new `Message` instance on success, or an empty `Result` indicating an error. /// A new `Message` instance on success, or an empty `Result` indicating an error.
fn from_buffer(buf: &[u8]) -> Self { fn from_buffer(buf: &[u8]) -> Self {
let mut message_length: [u8; 4] = [0; 4]; let mut message_length: [u8; 4] = [0; 4];
message_length[..4].copy_from_slice(&buf[..4]); message_length[..4].copy_from_slice(&buf[..4]);
let message_length = u32::from_be_bytes(message_length); let message_length = u32::from_be_bytes(message_length);
let payload: Option<Vec<u8>>; let payload: Option<Vec<u8>>;
let message_type: MessageType; let message_type: MessageType;
if message_length == 0 { if message_length == 0 {
message_type = MessageType::KeepAlive; message_type = MessageType::KeepAlive;
payload = None; payload = None;
} else if message_length == 5 { } else if message_length == 5 {
message_type = buf[4].into(); message_type = buf[4].into();
payload = None; payload = None;
} else { } else {
message_type = buf[4].into(); message_type = buf[4].into();
let end_of_message = 4 + message_length as usize; let end_of_message = 4 + message_length as usize;
payload = Some(buf[5..end_of_message].to_vec());
} if end_of_message > buf.len() {
error!("index too long");
Self { debug!("{buf:?}");
message_length, }
message_type,
payload payload = Some(buf[5..end_of_message].to_vec());
}
} }
Self {
message_length,
message_type,
payload
}
}
} }
impl ToBuffer for Message { impl ToBuffer for Message {
/// Converts the `Message` instance to a byte buffer for sending. /// Converts the `Message` instance to a byte buffer for sending.
/// ///
/// # Returns /// # Returns
/// ///
/// A byte vector containing the serialized message. /// A byte vector containing the serialized message.
fn to_buffer(self) -> Vec<u8> { fn to_buffer(self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
for byte in self.message_length.to_be_bytes() {
buf.push(byte);
}
match self.message_type {
MessageType::KeepAlive => {
return buf
},
MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => {
buf.push(self.message_type.into());
return buf;
},
MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => {
buf.push(self.message_type.into());
},
MessageType::Error => {
panic!("Error making message into buffer")
}
}
match self.payload {
None => { panic!("Error you are trying to create a message that needs a payload with no payload") }
Some(payload) => {
buf.extend(payload);
}
}
buf
}
}
for byte in self.message_length.to_be_bytes() {
buf.push(byte);
}
match self.message_type {
MessageType::KeepAlive => {
return buf
},
MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => {
buf.push(self.message_type.into());
return buf;
},
MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => {
buf.push(self.message_type.into());
},
MessageType::Error => {
panic!("Error making message into buffer")
}
}
match self.payload {
None => { panic!("Error you are trying to create a message that needs a payload with no payload") }
Some(payload) => {
buf.extend(payload);
}
}
buf
}
}
impl Message { impl Message {
/// Create a request message from a given piece_index, offset, and length /// Create a request message from a given piece_index, offset, and length
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `piece_index` - The index of the piece in the torrent /// * `piece_index` - The index of the piece in the torrent
/// * `offset` - The offset within the piece, because requests should be no more than 16KiB /// * `offset` - The offset within the piece, because requests should be no more than 16KiB
/// * `length` - The length of the piece request, should be 16KiB /// * `length` - The length of the piece request, should be 16KiB
/// ///
/// # Returns /// # Returns
/// ///
/// A piece request message /// A piece request message
pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self { pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self {
let mut payload: Vec<u8> = vec![]; let mut payload: Vec<u8> = vec![];
for byte in piece_index.to_be_bytes() { for byte in piece_index.to_be_bytes() {
payload.push(byte); payload.push(byte);
}
for byte in offset.to_be_bytes() {
payload.push(byte)
}
for byte in length.to_be_bytes() {
payload.push(byte)
}
Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) }
} }
/// Returns the number of messages in the given buffer and their contents. for byte in offset.to_be_bytes() {
/// payload.push(byte)
/// # Arguments
///
/// * `buf` - The byte buffer containing multiple serialized messages.
///
/// # Returns
///
/// A tuple containing a vector of message byte buffers and the number of messages.
pub fn number_of_messages(buf: &[u8]) -> (Vec<Vec<u8>>, u32) {
let mut message_num = 0;
let mut messages: Vec<Vec<u8>> = vec![];
// Find the length of message one
// put that into an array and increment counter by one
let mut i = 0; // points to the front
let mut j; // points to the back
loop {
j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4;
messages.push(buf[i..i+j].to_vec());
i += j;
message_num += 1;
if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 {
break;
}
}
(messages, message_num)
} }
for byte in length.to_be_bytes() {
payload.push(byte)
}
Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) }
}
/// Returns the number of messages in the given buffer and their contents.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing multiple serialized messages.
///
/// # Returns
///
/// A tuple containing a vector of message byte buffers and the number of messages.
pub fn number_of_messages(buf: &[u8]) -> (Vec<Vec<u8>>, u32) {
let mut message_num = 0;
let mut messages: Vec<Vec<u8>> = vec![];
// Find the length of message one
// put that into an array and increment counter by one
let mut i = 0; // points to the front
let mut j; // points to the back
loop {
j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4;
messages.push(buf[i..i+j].to_vec());
i += j;
message_num += 1;
if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 {
break;
}
}
(messages, message_num)
}
} }
/// An enum representing all possible message types in the BitTorrent peer wire protocol. /// An enum representing all possible message types in the BitTorrent peer wire protocol.
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
#[repr(u8)] #[repr(u8)]
pub enum MessageType { pub enum MessageType {
/// Keepalive message, 0 length. /// Keepalive message, 0 length.
/// Potential Errors if trying to handle a keepalive message like another message. /// Potential Errors if trying to handle a keepalive message like another message.
/// Due to length being 0, should always be explicitly handled. /// Due to length being 0, should always be explicitly handled.
KeepAlive = u8::MAX, KeepAlive = u8::MAX,
/// Message telling the client not to send any requests until the peer has unchoked, 1 length. /// Message telling the client not to send any requests until the peer has unchoked, 1 length.
Choke = 0, Choke = 0,
/// Message telling the client that it can send requests, 1 length. /// Message telling the client that it can send requests, 1 length.
Unchoke = 1, Unchoke = 1,
/// Message indicating that the peer is still interested in downloading, 1 length. /// Message indicating that the peer is still interested in downloading, 1 length.
Interested = 2, Interested = 2,
/// Message indicating that the peer is not interested in downloading, 1 length. /// Message indicating that the peer is not interested in downloading, 1 length.
NotInterested = 3, NotInterested = 3,
/// Message indicating that the peer has a given piece, fixed length. /// Message indicating that the peer has a given piece, fixed length.
Have = 4, Have = 4,
/// Message sent after a handshake, represents the pieces that the peer has. /// Message sent after a handshake, represents the pieces that the peer has.
Bitfield = 5, Bitfield = 5,
/// Request a given part of a piece based on index, offset, and length, 13 length. /// Request a given part of a piece based on index, offset, and length, 13 length.
Request = 6, Request = 6,
/// A response to a request with the accompanying data, varying length. /// A response to a request with the accompanying data, varying length.
Piece = 7, Piece = 7,
/// Cancels a request, 13 length. /// Cancels a request, 13 length.
Cancel = 8, Cancel = 8,
/// Placeholder for unimplemented message type. /// Placeholder for unimplemented message type.
Port = 9, Port = 9,
Error Error
} }
impl From<u8> for MessageType { impl From<u8> for MessageType {
fn from(val: u8) -> MessageType { fn from(val: u8) -> MessageType {
match val { match val {
0 => MessageType::Choke, 0 => MessageType::Choke,
1 => MessageType::Unchoke, 1 => MessageType::Unchoke,
2 => MessageType::Interested, 2 => MessageType::Interested,
3 => MessageType::NotInterested, 3 => MessageType::NotInterested,
4 => MessageType::Have, 4 => MessageType::Have,
5 => MessageType::Bitfield, 5 => MessageType::Bitfield,
6 => MessageType::Request, 6 => MessageType::Request,
7 => MessageType::Piece, 7 => MessageType::Piece,
8 => MessageType::Cancel, 8 => MessageType::Cancel,
9 => MessageType::Port, 9 => MessageType::Port,
_ => { _ => {
error!("Invalid Message Type: {}", val); error!("Invalid Message Type: {}", val);
MessageType::Error MessageType::Error
} }
}
} }
}
} }
impl From<MessageType> for u8 { impl From<MessageType> for u8 {
fn from(val: MessageType) -> u8 { fn from(val: MessageType) -> u8 {
match val { match val {
MessageType::Choke => 0, MessageType::Choke => 0,
MessageType::Unchoke => 1, MessageType::Unchoke => 1,
MessageType::Interested => 2, MessageType::Interested => 2,
MessageType::NotInterested => 3, MessageType::NotInterested => 3,
MessageType::Have => 4, MessageType::Have => 4,
MessageType::Bitfield => 5, MessageType::Bitfield => 5,
MessageType::Request => 6, MessageType::Request => 6,
MessageType::Piece => 7, MessageType::Piece => 7,
MessageType::Cancel => 8, MessageType::Cancel => 8,
MessageType::Port => 9, MessageType::Port => 9,
_ => { _ => {
error!("Invalid Message Type: {:?}", val); error!("Invalid Message Type: {:?}", val);
u8::MAX u8::MAX
} }
}
} }
}
} }

View File

@ -2,214 +2,214 @@
// Crate Imports // Crate Imports
use crate::{ use crate::{
handshake::Handshake, handshake::Handshake,
message::{ FromBuffer, Message, MessageType, ToBuffer }, message::{ FromBuffer, Message, MessageType, ToBuffer },
torrent::Torrent torrent::Torrent
}; };
// External imports // External imports
use log::{ debug, error }; use log::{ debug, error };
use std::net::SocketAddr; use std::net::SocketAddr;
use tokio::{ use tokio::{
io::{ AsyncReadExt, AsyncWriteExt }, io::{ AsyncReadExt, AsyncWriteExt },
net::TcpStream net::TcpStream
}; };
/// Structure to abstract interaction with a peer. /// Structure to abstract interaction with a peer.
pub struct Peer { pub struct Peer {
/// The `TcpStream` that is used to communicate with the peeer /// The `TcpStream` that is used to communicate with the peeer
connection_stream: TcpStream, connection_stream: TcpStream,
/// The `SocketAddr` of the peer /// The `SocketAddr` of the peer
pub socket_addr: SocketAddr, pub socket_addr: SocketAddr,
/// The id of the peer /// The id of the peer
pub peer_id: String, pub peer_id: String,
/// Whether the peer is choking the client /// Whether the peer is choking the client
pub choking: bool, pub choking: bool,
} }
impl Peer { impl Peer {
/// Creates a connection to the peer. /// Creates a connection to the peer.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `socket_address` - The socket address of the peer. /// * `socket_address` - The socket address of the peer.
pub async fn create_connection(socket_address: &str) -> Option<Self> { pub async fn create_connection(socket_address: &str) -> Option<Self> {
let socket_addr = match socket_address.parse::<SocketAddr>() { let socket_addr = match socket_address.parse::<SocketAddr>() {
Err(err) => { Err(err) => {
error!("error parsing address {}, err: {}", socket_address, err); error!("error parsing address {}, err: {}", socket_address, err);
return None; return None;
} }
Ok(addr) => { addr } Ok(addr) => { addr }
}; };
let connection_stream = match TcpStream::connect(socket_addr).await { let connection_stream = match TcpStream::connect(socket_addr).await {
Err(err) => { Err(err) => {
error!("unable to connect to {}, err: {}", socket_address, err); error!("unable to connect to {}, err: {}", socket_address, err);
return None return None
}, },
Ok(stream) => { Ok(stream) => {
debug!("created tcpstream successfully to: {socket_addr}"); debug!("created tcpstream successfully to: {socket_addr}");
stream stream
} }
}; };
Some(Self { Some(Self {
connection_stream, connection_stream,
socket_addr, socket_addr,
peer_id: String::new(), peer_id: String::new(),
choking: true, choking: true,
}) })
} }
/// Sends a handshake message to the peer, the first step in the peer wire messaging protocol. /// Sends a handshake message to the peer, the first step in the peer wire messaging protocol.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `torrent` - The `Torrent` instance associated with the peer. /// * `torrent` - The `Torrent` instance associated with the peer.
pub async fn handshake(&mut self, torrent: &Torrent) { pub async fn handshake(&mut self, torrent: &Torrent) {
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];
let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap();
self.connection_stream.writable().await.unwrap(); self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap(); self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read(&mut buf).await.unwrap(); let _ = self.connection_stream.read(&mut buf).await.unwrap();
let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap();
handshake.log_useful_information(); handshake.log_useful_information();
for message_buf in Message::number_of_messages(&buf[68..]).0 { for message_buf in Message::number_of_messages(&buf[68..]).0 {
let message = Message::from_buffer(&message_buf); let message = Message::from_buffer(&message_buf);
if message.message_type == MessageType::Unchoke { if message.message_type == MessageType::Unchoke {
self.choking = false; self.choking = false;
} }
}
self.peer_id = handshake.peer_id;
}
/// Keeps the connection alive and sends interested messages until the peer unchokes
pub async fn keep_alive_until_unchoke(&mut self) {
loop {
let message = self.read_message().await;
debug!("{message:?}");
match message.message_type {
MessageType::Unchoke => {
self.choking = false;
break
}
MessageType::KeepAlive => {
self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await;
self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await;
}
MessageType::Choke => {
self.choking = true;
}
_ => { continue }
}
}
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message(&mut self, message: Message) -> Message {
let mut buf = vec![0; 16_397];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf)
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message {
let mut buf = vec![0; size];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf)
}
/// Sends a message but doesn't wait for a response
pub async fn send_message_no_response(&mut self, message: Message) {
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
} }
/// reads a message from the peer self.peer_id = handshake.peer_id;
pub async fn read_message(&mut self) -> Message { }
let mut buf = vec![0; 16_397];
/// Keeps the connection alive and sends interested messages until the peer unchokes
self.connection_stream.readable().await.unwrap(); pub async fn keep_alive_until_unchoke(&mut self) {
let _ = self.connection_stream.read(&mut buf).await.unwrap(); loop {
let message = self.read_message().await;
Message::from_buffer(&buf)
} debug!("{message:?}");
match message.message_type {
/// Shutsdown the connection stream MessageType::Unchoke => {
pub async fn disconnect(&mut self) { self.choking = false;
match self.connection_stream.shutdown().await { break
Err(err) => {
error!("Error disconnecting from {}: {}", self.socket_addr, err);
panic!("Error disconnecting from {}: {}", self.socket_addr, err);
},
Ok(_) => {
debug!("Successfully disconnected from {}", self.socket_addr)
}
} }
MessageType::KeepAlive => {
self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await;
self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await;
}
MessageType::Choke => {
self.choking = true;
}
_ => { continue }
}
} }
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message(&mut self, message: Message) -> Message {
let mut buf = vec![0; 16_397];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf)
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message {
let mut buf = vec![0; size];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf)
}
/// Sends a message but doesn't wait for a response
pub async fn send_message_no_response(&mut self, message: Message) {
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
}
/// reads a message from the peer
pub async fn read_message(&mut self) -> Message {
let mut buf = vec![0; 16_397];
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read(&mut buf).await.unwrap();
Message::from_buffer(&buf)
}
/// Shutsdown the connection stream
pub async fn disconnect(&mut self) {
match self.connection_stream.shutdown().await {
Err(err) => {
error!("Error disconnecting from {}: {}", self.socket_addr, err);
panic!("Error disconnecting from {}: {}", self.socket_addr, err);
},
Ok(_) => {
debug!("Successfully disconnected from {}", self.socket_addr)
}
}
}
} }
impl Peer { impl Peer {
// Sends the requests and reads responses to put a piece together // Sends the requests and reads responses to put a piece together
pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec<u8> { pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec<u8> {
let mut buf = vec![]; let mut buf = vec![];
// Sequentially requests piece from the peer // Sequentially requests piece from the peer
for offset in (0..piece_length).step_by(16_384) { for offset in (0..piece_length).step_by(16_384) {
let mut length = 16_384; let mut length = 16_384;
let response: Message; let response: Message;
if *len + 16_384 >= total_len { if *len + 16_384 >= total_len {
debug!("Final Request {}", total_len - *len); debug!("Final Request {}", total_len - *len);
length = total_len - *len; length = total_len - *len;
response = self.send_message_exact_size_response( response = self.send_message_exact_size_response(
Message::create_request(index, offset, length), Message::create_request(index, offset, length),
length as usize + 13 length as usize + 13
).await; ).await;
} else { } else {
response = self.send_message(Message::create_request(index, offset, length)).await; response = self.send_message(Message::create_request(index, offset, length)).await;
}; };
match response.message_type { match response.message_type {
MessageType::Piece => { MessageType::Piece => {
let mut data = response.payload.unwrap(); let mut data = response.payload.unwrap();
*len += data.len() as u32; *len += data.len() as u32;
*len -= 8; *len -= 8;
for byte in data.drain(..).skip(8) { for byte in data.drain(..).skip(8) {
buf.push(byte) buf.push(byte)
} }
}, },
_ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } _ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); }
}; };
if *len >= total_len - 1 { if *len >= total_len - 1 {
return buf; return buf;
} }
}
buf
} }
buf
}
} }

View File

@ -11,203 +11,203 @@ struct Node(String, i64);
/// Represents a file described in a torrent. /// Represents a file described in a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct File { pub struct File {
pub path: Vec<String>, pub path: Vec<String>,
pub length: u64, pub length: u64,
#[serde(default)] #[serde(default)]
md5sum: Option<String>, md5sum: Option<String>,
} }
/// Represents the metadata of a torrent. /// Represents the metadata of a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Info { pub struct Info {
pub name: String, pub name: String,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub pieces: Vec<u8>, pub pieces: Vec<u8>,
#[serde(rename = "piece length")] #[serde(rename = "piece length")]
pub piece_length: u64, pub piece_length: u64,
#[serde(default)] #[serde(default)]
md5sum: Option<String>, md5sum: Option<String>,
#[serde(default)] #[serde(default)]
pub length: Option<i64>, pub length: Option<i64>,
#[serde(default)] #[serde(default)]
pub files: Option<Vec<File>>, pub files: Option<Vec<File>>,
#[serde(default)] #[serde(default)]
private: Option<u8>, private: Option<u8>,
#[serde(default)] #[serde(default)]
path: Option<Vec<String>>, path: Option<Vec<String>>,
#[serde(default)] #[serde(default)]
#[serde(rename = "root hash")] #[serde(rename = "root hash")]
root_hash: Option<String>, root_hash: Option<String>,
} }
/// Represents a torrent. /// Represents a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Torrent { pub struct Torrent {
pub info: Info, pub info: Info,
#[serde(default)] #[serde(default)]
pub announce: Option<String>, pub announce: Option<String>,
#[serde(default)] #[serde(default)]
nodes: Option<Vec<Node>>, nodes: Option<Vec<Node>>,
#[serde(default)] #[serde(default)]
encoding: Option<String>, encoding: Option<String>,
#[serde(default)] #[serde(default)]
httpseeds: Option<Vec<String>>, httpseeds: Option<Vec<String>>,
#[serde(default)] #[serde(default)]
#[serde(rename = "announce-list")] #[serde(rename = "announce-list")]
announce_list: Option<Vec<Vec<String>>>, announce_list: Option<Vec<Vec<String>>>,
#[serde(default)] #[serde(default)]
#[serde(rename = "creation date")] #[serde(rename = "creation date")]
creation_date: Option<i64>, creation_date: Option<i64>,
#[serde(rename = "comment")] #[serde(rename = "comment")]
comment: Option<String>, comment: Option<String>,
#[serde(default)] #[serde(default)]
#[serde(rename = "created by")] #[serde(rename = "created by")]
created_by: Option<String> created_by: Option<String>
} }
impl Torrent { impl Torrent {
/// Reads a `.torrent` file and converts it into a `Torrent` struct. /// Reads a `.torrent` file and converts it into a `Torrent` struct.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `path` - The path to the `.torrent` file. /// * `path` - The path to the `.torrent` file.
pub async fn from_torrent_file(path: &str) -> Self { pub async fn from_torrent_file(path: &str) -> Self {
// Read .torrent File // Read .torrent File
let mut file = TokioFile::open(path).await.unwrap(); let mut file = TokioFile::open(path).await.unwrap();
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
match file.read_to_end(&mut buf).await { match file.read_to_end(&mut buf).await {
Err(err) => { Err(err) => {
error!("Error reading file till end: {err}"); error!("Error reading file till end: {err}");
panic!("Error reading file till end: {err}"); panic!("Error reading file till end: {err}");
} }
Ok(_) => { info!("Succesfully read {path}") } Ok(_) => { info!("Succesfully read {path}") }
}
match serde_bencode::from_bytes(&buf) {
Err(err) => {
error!("Error deserializing file: {err}");
panic!("Error deserializing file: {err}");
}
Ok(torrent) => { torrent }
}
} }
/// Logs info about the *.torrent file match serde_bencode::from_bytes(&buf) {
pub fn log_useful_information(&self) { Err(err) => {
info!(" -> Torrent Information <- "); error!("Error deserializing file: {err}");
info!("Name: {}", self.info.name); panic!("Error deserializing file: {err}");
info!("Tracker: {:?}", self.announce); }
info!("Tracker List: {:?}", self.announce_list); Ok(torrent) => { torrent }
info!("Info Hash: {:X?}", self.get_info_hash()); }
info!("Length: {:?}", self.info.length); }
/// Logs info about the *.torrent file
pub fn log_useful_information(&self) {
info!(" -> Torrent Information <- ");
info!("Name: {}", self.info.name);
info!("Tracker: {:?}", self.announce);
info!("Tracker List: {:?}", self.announce_list);
info!("Info Hash: {:X?}", self.get_info_hash());
info!("Length: {:?}", self.info.length);
info!("Files:");
match &self.info.files {
None => {
info!("> {}", self.info.name);
}
Some(files) => {
for file in files {
let mut path = String::new();
for section in &file.path {
path.push_str(&format!("{section}/"));
}
path.pop();
info!("> {}: {}B", path, file.length)
}
}
}
}
/// Calculates the info hash of the torrent.
pub fn get_info_hash(&self) -> Vec<u8> {
let buf = serde_bencode::to_bytes(&self.info).unwrap();
let mut hasher = Sha1::new();
hasher.update(buf);
let res = hasher.finalize();
res[..].to_vec()
}
/// Checks if a downloaded piece matches its hash.
///
/// # Arguments
///
/// * `piece` - The downloaded piece.
/// * `index` - The index of the piece.
///
/// # Returns
///
/// * `true` if the piece is correct, `false` otherwise.
pub fn check_piece(&self, piece: &[u8], index: u32) -> bool {
let mut hasher = Sha1::new();
hasher.update(piece);
let result = hasher.finalize();
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash {
info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),);
true
} else {
debug!("{:?}", &result[..]);
debug!("{:?}", piece_hash);
debug!("{:?}", &result[..].len());
debug!("{:?}", piece_hash.len());
debug!("{}", piece.len());
error!("Piece downloaded incorrectly");
false
}
}
pub fn get_total_length(&self) -> u64 {
match self.info.length {
None => {},
Some(n) => { return n as u64 }
};
match &self.info.files {
None => { 0 },
Some(files) => {
let mut n = 0;
info!("Files:"); for file in files {
n += file.length;
match &self.info.files {
None => {
info!("> {}", self.info.name);
}
Some(files) => {
for file in files {
let mut path = String::new();
for section in &file.path {
path.push_str(&format!("{section}/"));
}
path.pop();
info!("> {}: {}B", path, file.length)
}
}
}
}
/// Calculates the info hash of the torrent.
pub fn get_info_hash(&self) -> Vec<u8> {
let buf = serde_bencode::to_bytes(&self.info).unwrap();
let mut hasher = Sha1::new();
hasher.update(buf);
let res = hasher.finalize();
res[..].to_vec()
}
/// Checks if a downloaded piece matches its hash.
///
/// # Arguments
///
/// * `piece` - The downloaded piece.
/// * `index` - The index of the piece.
///
/// # Returns
///
/// * `true` if the piece is correct, `false` otherwise.
pub fn check_piece(&self, piece: &[u8], index: u32) -> bool {
let mut hasher = Sha1::new();
hasher.update(piece);
let result = hasher.finalize();
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash {
info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),);
true
} else {
debug!("{:?}", &result[..]);
debug!("{:?}", piece_hash);
debug!("{:?}", &result[..].len());
debug!("{:?}", piece_hash.len());
debug!("{}", piece.len());
error!("Piece downloaded incorrectly");
false
}
}
pub fn get_total_length(&self) -> u64 {
match self.info.length {
None => {},
Some(n) => { return n as u64 }
}; };
match &self.info.files { n
None => { 0 }, }
Some(files) => {
let mut n = 0;
for file in files {
n += file.length;
};
n
}
}
} }
}
pub fn get_tracker(&self) -> (&str, u16) {
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); pub fn get_tracker(&self) -> (&str, u16) {
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap();
if let Some(url) = &self.announce {
if let Some(captures) = re.captures(url) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap()); if let Some(url) = &self.announce {
} else { if let Some(captures) = re.captures(url) {
println!("URL does not match the expected pattern | {}", url); let hostname = captures.get(1).unwrap().as_str();
} let port = captures.get(2).unwrap().as_str();
}
return (hostname, port.parse().unwrap());
for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() { } else {
debug!("{:?}", url); println!("URL does not match the expected pattern | {}", url);
if let Some(captures) = re.captures(&url[i]) { }
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap());
} else {
println!("URL does not match the expected pattern | {}", url[i]);
}
}
("", 0)
} }
for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() {
debug!("{:?}", url);
if let Some(captures) = re.captures(&url[i]) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap());
} else {
println!("URL does not match the expected pattern | {}", url[i]);
}
}
("", 0)
}
} }

View File

@ -4,276 +4,289 @@ use tokio::net::UdpSocket;
use log::{debug, error}; use log::{debug, error};
pub struct Tracker { pub struct Tracker {
/// A UdpSocket used for communication. /// A UdpSocket used for communication.
connection_stream: UdpSocket, connection_stream: UdpSocket,
/// The local socket address requests are made from /// The local socket address requests are made from
pub socket_addr: SocketAddr, pub socket_addr: SocketAddr,
/// The remote socket address of the tracker. /// The remote socket address of the tracker.
pub remote_addr: SocketAddr pub remote_addr: SocketAddr
} }
impl Tracker { impl Tracker {
/// Creates a new `Tracker` instance asynchronously. /// Creates a new `Tracker` instance asynchronously.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `socket_address` - Local socket address for binding. /// * `socket_address` - Local socket address for binding.
/// * `remote_hostname` - Remote host for connection. /// * `remote_hostname` - Remote host for connection.
/// * `remote_port` - Remote port for connection. /// * `remote_port` - Remote port for connection.
/// ///
/// # Panics /// # Panics
/// ///
/// Panics if there is an error parsing the given address or creating the UDP socket. /// Panics if there is an error parsing the given address or creating the UDP socket.
pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self { pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self {
let socket_addr = match socket_address.parse::<SocketAddr>() { let socket_addr = match socket_address.parse::<SocketAddr>() {
Err(err) => { Err(err) => {
error!("error parsing address {}, err: {}", socket_address, err); error!("error parsing address {}, err: {}", socket_address, err);
panic!("error parsing given address, {}", err); panic!("error parsing given address, {}", err);
} }
Ok(addr) => { addr } Ok(addr) => { addr }
}; };
let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0]; let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0];
let remote_addr = SocketAddr::new(remote_address, remote_port); let remote_addr = SocketAddr::new(remote_address, remote_port);
let connection_stream = match UdpSocket::bind(socket_addr).await { let connection_stream = match UdpSocket::bind(socket_addr).await {
Err(err) => { Err(err) => {
error!("unable to bind to {}, err: {}", socket_address, err); error!("unable to bind to {}, err: {}", socket_address, err);
panic!("error creating udpsocket, {}", err); panic!("error creating udpsocket, {}", err);
}, },
Ok(stream) => { Ok(stream) => {
debug!("bound udpsocket successfully to: {socket_addr}"); debug!("bound udpsocket successfully to: {socket_addr}");
stream stream
} }
}; };
match connection_stream.connect(remote_addr).await { match connection_stream.connect(remote_addr).await {
Err(err) => { Err(err) => {
error!("unable to connect to {}, err: {}", remote_addr, err); error!("unable to connect to {}, err: {}", remote_addr, err);
panic!("error creating udpsocket, {}", err); panic!("error creating udpsocket, {}", err);
}, },
Ok(()) => { Ok(()) => {
debug!("successfully connected to: {remote_addr}"); debug!("successfully connected to: {remote_addr}");
} }
}; };
Self { Self {
connection_stream, connection_stream,
socket_addr, socket_addr,
remote_addr remote_addr
}
}
/// Sends a message to the tracker and receives a response asynchronously.
///
/// # Arguments
///
/// * `message` - A type that implements the `ToBuffer` trait, representing the message to send.
///
/// # Returns
///
/// A byte vector containing the received response.
pub async fn send_message<T: ToBuffer>(&mut self, message: &T) -> Vec<u8> {
let mut buf: Vec<u8> = vec![ 0; 16_384 ];
self.connection_stream.send(&message.to_buffer()).await.unwrap();
self.connection_stream.recv(&mut buf).await.unwrap();
buf
} }
}
/// Sends a message to the tracker and receives a response asynchronously.
///
/// # Arguments
///
/// * `message` - A type that implements the `ToBuffer` trait, representing the message to send.
///
/// # Returns
///
/// A byte vector containing the received response.
pub async fn send_message<T: ToBuffer>(&mut self, message: &T) -> Vec<u8> {
let mut buf: Vec<u8> = vec![ 0; 16_384 ];
self.connection_stream.send(&message.to_buffer()).await.unwrap();
self.connection_stream.recv(&mut buf).await.unwrap();
buf
}
} }
/// A trait for converting a type into a byte buffer. /// A trait for converting a type into a byte buffer.
pub trait ToBuffer { pub trait ToBuffer {
/// Converts the implementing type into a byte buffer. /// Converts the implementing type into a byte buffer.
fn to_buffer(&self) -> Vec<u8>; fn to_buffer(&self) -> Vec<u8>;
} }
/// A trait for converting a type from a byte buffer. /// A trait for converting a type from a byte buffer.
pub trait FromBuffer { pub trait FromBuffer {
/// Converts a byte buffer into the implementing type. /// Converts a byte buffer into the implementing type.
fn from_buffer(buf: &[u8]) -> Self; fn from_buffer(buf: &[u8]) -> Self;
} }
#[derive(Debug)] #[derive(Debug)]
/// Represents a basic connection message. /// Represents a basic connection message.
pub struct ConnectionMessage { pub struct ConnectionMessage {
pub connection_id: i64, pub connection_id: i64,
action: i32, action: i32,
transaction_id: i32, transaction_id: i32,
} }
impl ConnectionMessage { impl ConnectionMessage {
/// Creates a new basic connection message /// Creates a new basic connection message
pub fn create_basic_connection() -> Self { pub fn create_basic_connection() -> Self {
Self { Self {
connection_id: 4497486125440, connection_id: 4497486125440,
action: 0, action: 0,
transaction_id: 123 transaction_id: 123
}
} }
}
} }
impl ToBuffer for ConnectionMessage { impl ToBuffer for ConnectionMessage {
fn to_buffer(&self) -> Vec<u8> { fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
buf.extend(self.connection_id.to_be_bytes()); buf.extend(self.connection_id.to_be_bytes());
buf.extend(self.action.to_be_bytes()); buf.extend(self.action.to_be_bytes());
buf.extend(self.transaction_id.to_be_bytes()); buf.extend(self.transaction_id.to_be_bytes());
buf buf
} }
} }
impl FromBuffer for ConnectionMessage { impl FromBuffer for ConnectionMessage {
fn from_buffer(buf: &[u8]) -> Self { fn from_buffer(buf: &[u8]) -> Self {
let mut action: [u8; 4] = [0; 4]; let mut action: [u8; 4] = [0; 4];
action[..4].copy_from_slice(&buf[..4]); action[..4].copy_from_slice(&buf[..4]);
let action = i32::from_be_bytes(action); let action = i32::from_be_bytes(action);
let mut transaction_id: [u8; 4] = [0; 4]; let mut transaction_id: [u8; 4] = [0; 4];
transaction_id[..4].copy_from_slice(&buf[4..8]); transaction_id[..4].copy_from_slice(&buf[4..8]);
let transaction_id = i32::from_be_bytes(transaction_id); let transaction_id = i32::from_be_bytes(transaction_id);
let mut connection_id: [u8; 8] = [0; 8]; let mut connection_id: [u8; 8] = [0; 8];
connection_id[..4].copy_from_slice(&buf[8..16]); connection_id[..8].copy_from_slice(&buf[8..16]);
let connection_id = i64::from_be_bytes(connection_id); let connection_id = i64::from_be_bytes(connection_id);
Self {
connection_id,
Self { action,
connection_id, transaction_id
action,
transaction_id
}
} }
}
} }
#[derive(Debug)] #[derive(Debug)]
/// Represents an announcement message in the bittorrent udp tracker protocol. /// Represents an announcement message in the BitTorrent UDP tracker protocol.
pub struct AnnounceMessage { pub struct AnnounceMessage {
connection_id: i64, /// The connection ID used for this tracker communication session.
action: i32, connection_id: i64,
transaction_id: i32, /// The action code representing the type of message (e.g., connect, announce, scrape).
info_hash: [u8; 20], action: i32,
peer_id: [u8; 20], /// A unique identifier for this transaction, allowing matching responses to requests.
downloaded: i64, transaction_id: i32,
left: i64, /// The 20-byte SHA-1 hash of the info dictionary in the torrent metainfo.
uploaded: i64, info_hash: [u8; 20],
event: i32, /// The unique ID identifying the peer/client sending the announce message.
ip: u32, peer_id: [u8; 20],
key: u32, /// The total amount of data downloaded by the client in this torrent, in bytes.
num_want: i32, downloaded: i64,
port: u16, /// The amount of data left to download for the client in this torrent, in bytes.
extensions: u16 left: i64,
/// The total amount of data uploaded by the client in this torrent, in bytes.
uploaded: i64,
/// An event code indicating the purpose of the announce (e.g., started, completed, stopped).
event: i32,
/// The IP address of the client, expressed as a 32-bit unsigned integer.
ip: u32,
/// A unique key generated by the client for the tracker to identify the peer.
key: u32,
/// The maximum number of peers that the client wants to receive from the tracker.
num_want: i32,
/// The port on which the client is listening for incoming peer connections.
port: u16,
/// Additional extension flags or data included in the announce message.
extensions: u16,
} }
impl AnnounceMessage { impl AnnounceMessage {
/// Creates a new announce message. /// Creates a new announce message.
pub fn new(connection_id: i64, infohash: &[u8], peerid: &str, total_length: i64) -> Self { pub fn new(connection_id: i64, infohash: &[u8], peerid: &str, total_length: i64) -> Self {
let mut info_hash: [u8; 20] = [ 0; 20 ]; let mut info_hash: [u8; 20] = [ 0; 20 ];
info_hash[..20].copy_from_slice(&infohash[..20]); info_hash[..20].copy_from_slice(&infohash[..20]);
let mut peer_id: [u8; 20] = [0; 20]; let mut peer_id: [u8; 20] = [0; 20];
for (i, character) in peerid.chars().enumerate() { for (i, character) in peerid.chars().enumerate() {
peer_id[i] = character as u8; peer_id[i] = character as u8;
}
Self {
connection_id,
action: 1,
transaction_id: 132,
info_hash,
peer_id,
downloaded: 0,
left: total_length,
uploaded: 0,
event: 1,
ip: 0,
key: 234,
num_want: -1,
port: 61389,
extensions: 0
}
} }
Self {
connection_id,
action: 1,
transaction_id: 132,
info_hash,
peer_id,
downloaded: 0,
left: total_length,
uploaded: 0,
event: 1,
ip: 0,
key: 234,
num_want: -1,
port: 61389,
extensions: 0
}
}
} }
impl ToBuffer for AnnounceMessage { impl ToBuffer for AnnounceMessage {
fn to_buffer(&self) -> Vec<u8> { fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![]; let mut buf: Vec<u8> = vec![];
buf.extend(self.connection_id.to_be_bytes()); buf.extend(self.connection_id.to_be_bytes());
buf.extend(self.action.to_be_bytes()); buf.extend(self.action.to_be_bytes());
buf.extend(self.transaction_id.to_be_bytes()); buf.extend(self.transaction_id.to_be_bytes());
buf.extend(self.info_hash); buf.extend(self.info_hash);
buf.extend(self.peer_id); buf.extend(self.peer_id);
buf.extend(self.downloaded.to_be_bytes()); buf.extend(self.downloaded.to_be_bytes());
buf.extend(self.left.to_be_bytes()); buf.extend(self.left.to_be_bytes());
buf.extend(self.uploaded.to_be_bytes()); buf.extend(self.uploaded.to_be_bytes());
buf.extend(self.event.to_be_bytes()); buf.extend(self.event.to_be_bytes());
buf.extend(self.ip.to_be_bytes()); buf.extend(self.ip.to_be_bytes());
buf.extend(self.key.to_be_bytes()); buf.extend(self.key.to_be_bytes());
buf.extend(self.num_want.to_be_bytes()); buf.extend(self.num_want.to_be_bytes());
buf.extend(self.port.to_be_bytes()); buf.extend(self.port.to_be_bytes());
buf.extend(self.extensions.to_be_bytes()); buf.extend(self.extensions.to_be_bytes());
buf buf
} }
} }
#[derive(Debug)] #[derive(Debug)]
/// Represents a response to an announcement message. /// Represents a response to an announcement message.
pub struct AnnounceMessageResponse { pub struct AnnounceMessageResponse {
pub action: i32, pub action: i32,
pub transaction_id: i32, pub transaction_id: i32,
pub interval: i32, pub interval: i32,
pub leechers: i32, pub leechers: i32,
pub seeders: i32, pub seeders: i32,
pub ips: Vec<Ipv4Addr>, pub ips: Vec<Ipv4Addr>,
pub ports: Vec<u16> pub ports: Vec<u16>
} }
impl FromBuffer for AnnounceMessageResponse { impl FromBuffer for AnnounceMessageResponse {
/// Converts a byte buffer into an `AnnounceMessageResponse` instance. /// Converts a byte buffer into an `AnnounceMessageResponse` instance.
fn from_buffer(buf: &[u8]) -> Self { fn from_buffer(buf: &[u8]) -> Self {
let mut action: [u8; 4] = [0; 4]; let mut action: [u8; 4] = [0; 4];
action[..4].copy_from_slice(&buf[0..4]); action[..4].copy_from_slice(&buf[0..4]);
let action = i32::from_be_bytes(action); let action = i32::from_be_bytes(action);
let mut transaction_id: [u8; 4] = [ 0; 4 ]; let mut transaction_id: [u8; 4] = [ 0; 4 ];
transaction_id[..4].copy_from_slice(&buf[4..8]); transaction_id[..4].copy_from_slice(&buf[4..8]);
let transaction_id = i32::from_be_bytes(transaction_id); let transaction_id = i32::from_be_bytes(transaction_id);
let mut interval: [u8; 4] = [0; 4]; let mut interval: [u8; 4] = [0; 4];
interval[..4].copy_from_slice(&buf[8..12]); interval[..4].copy_from_slice(&buf[8..12]);
let interval = i32::from_be_bytes(interval); let interval = i32::from_be_bytes(interval);
let mut leechers: [u8; 4] = [0; 4]; let mut leechers: [u8; 4] = [0; 4];
leechers[..4].copy_from_slice(&buf[12..16]); leechers[..4].copy_from_slice(&buf[12..16]);
let leechers = i32::from_be_bytes(leechers); let leechers = i32::from_be_bytes(leechers);
let mut seeders: [u8; 4] = [0; 4]; let mut seeders: [u8; 4] = [0; 4];
seeders[..4].copy_from_slice(&buf[16..20]); seeders[..4].copy_from_slice(&buf[16..20]);
let seeders = i32::from_be_bytes(seeders); let seeders = i32::from_be_bytes(seeders);
let mut ips: Vec<Ipv4Addr> = vec![]; let mut ips: Vec<Ipv4Addr> = vec![];
let mut ports: Vec<u16> = vec![]; let mut ports: Vec<u16> = vec![];
for i in (20..buf.len()-6).step_by(6) { for i in (20..buf.len()-6).step_by(6) {
let ip = Ipv4Addr::new(buf[i], buf[i+1], buf[i+2], buf[i+3]); let ip = Ipv4Addr::new(buf[i], buf[i+1], buf[i+2], buf[i+3]);
let port = u16::from_be_bytes([buf[i+4], buf[i+5]]); let port = u16::from_be_bytes([buf[i+4], buf[i+5]]);
if ip.to_string() == "0.0.0.0" && port == 0 { if ip.to_string() == "0.0.0.0" && port == 0 {
break; break;
} }
ips.push(ip); ips.push(ip);
ports.push(port) ports.push(port)
}
Self { action, transaction_id, interval, leechers, seeders, ips: ips[1..].to_vec(), ports: ports[1..].to_vec() }
} }
Self { action, transaction_id, interval, leechers, seeders, ips: ips[1..].to_vec(), ports: ports[1..].to_vec() }
}
} }