Better error and tracker handling

This commit is contained in:
Arlo Filley 2023-10-06 16:40:43 +01:00
parent 5eb73b29ad
commit 930f50042d
11 changed files with 377 additions and 319 deletions

16
.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "lldb",
"request": "launch",
"name": "Debug",
"program": "${workspaceFolder}/<executable file>",
"args": [],
"cwd": "${workspaceFolder}"
}
]
}

View File

@ -1,5 +0,0 @@
{
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
]
}

1
downloads/README Normal file
View File

@ -0,0 +1 @@
This is a test file

Binary file not shown.

After

Width:  |  Height:  |  Size: 17 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.6 MiB

View File

@ -17,22 +17,22 @@ mod message;
mod torrent; mod torrent;
mod tracker; mod tracker;
use core::panic;
use std::net::SocketAddr;
// Crate Imports // Crate Imports
use crate::{ use crate::{
files::Files, files::Files,
peer::Peer, peer::Peer,
torrent::Torrent, torrent::Torrent,
tracker::{ tracker::tracker::Tracker
AnnounceMessage, AnnounceMessageResponse,
ConnectionMessage,
FromBuffer,
Tracker
}
}; };
use tokio::sync::mpsc;
// External Ipmorts // External Ipmorts
use clap::Parser; use clap::Parser;
use log::{ debug, info, LevelFilter }; use log::{ debug, info, LevelFilter, error };
use tokio::spawn;
/// Struct Respresenting needed arguments /// Struct Respresenting needed arguments
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -46,6 +46,9 @@ struct Args {
#[arg(short, long)] #[arg(short, long)]
download_path: String, download_path: String,
#[arg(short, long)]
peer_id: String,
} }
/// The root function /// The root function
@ -55,11 +58,13 @@ async fn main() {
// Creates a log file to handle large amounts of data // Creates a log file to handle large amounts of data
let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log")); let log_path = args.log_file_path.unwrap_or(String::from("./log/rustytorrent.log"));
simple_logging::log_to_file(&log_path, LevelFilter::Info).unwrap(); //simple_logging::log_to_file(&log_path, LevelFilter::Debug).unwrap();
simple_logging::log_to_stderr(LevelFilter::Debug);
info!("==> WELCOME TO RUSTY-TORRENT <==");
// Read the Torrent File // Read the Torrent File
let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await;
info!("Sucessfully read torrent file");
torrent.log_useful_information(); torrent.log_useful_information();
// Create the files that will be written to // Create the files that will be written to
@ -67,42 +72,39 @@ async fn main() {
files.create_files(&torrent, &args.download_path).await; files.create_files(&torrent, &args.download_path).await;
// Gets peers from the given tracker // Gets peers from the given tracker
let (_remote_hostname, _remote_port) = torrent.get_tracker();
let Some(socketaddrs) = torrent.get_trackers() else {
error!("couldn't find trackers");
panic!("couldn't find trackers")
};
let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337); let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337);
debug!("{}:{}", remote_hostname, remote_port); debug!("{}:{}", remote_hostname, remote_port);
let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await; info!("");
info!("--> Finding Peers <--");
let listen_address = "0.0.0.0:61389".parse::<SocketAddr>().unwrap();
let Ok(mut tracker) = Tracker::new(listen_address, std::net::SocketAddr::V4(socketaddrs[0])).await else {
panic!("tracker couldn't be created")
};
info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port); info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port);
let connection_message = ConnectionMessage::from_buffer(
&tracker.send_message(&ConnectionMessage::create_basic_connection()).await
);
debug!("{:?}", connection_message); let peers = tracker.find_peers(&torrent, &args.peer_id).await;
let announce_message_response = AnnounceMessageResponse::from_buffer(
&tracker.send_message(&AnnounceMessage::new(
connection_message.connection_id,
&torrent.get_info_hash(),
"-MY0001-123456654321",
torrent.get_total_length() as i64
)).await
);
debug!("{:?}", announce_message_response);
info!("Found Peers"); info!("Found Peers");
// Creates an assumed peer connection to the `SocketAddr` given let num_pieces = torrent.info.pieces.len() / 20;
let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[0], announce_message_response.ports[0])).await {
let mut peer = match Peer::create_connection(peers[0]).await {
None => { return }, None => { return },
Some(peer) => peer Some(peer) => peer
}; };
let num_pieces = torrent.info.pieces.len() / 20;
peer.handshake(&torrent).await; peer.handshake(&torrent).await;
peer.keep_alive_until_unchoke().await; peer.keep_alive_until_unchoke().await;
info!("Successfully Created Connection with peer: {}", peer.peer_id); info!("Successfully Created Connection with peer: {}", peer.peer_id);
println!("{}", peers.len());
let mut len = 0; let mut len = 0;
for index in 0..num_pieces { for index in 0..num_pieces {
@ -119,5 +121,7 @@ async fn main() {
} }
peer.disconnect().await; peer.disconnect().await;
info!("Successfully completed download"); info!("Successfully completed download");
} }

View File

@ -49,7 +49,7 @@ impl FromBuffer for Message {
let message_length = u32::from_be_bytes(message_length); let message_length = u32::from_be_bytes(message_length);
let payload: Option<Vec<u8>>; let mut payload: Option<Vec<u8>> = None;
let message_type: MessageType; let message_type: MessageType;
if message_length == 0 { if message_length == 0 {
@ -66,10 +66,10 @@ impl FromBuffer for Message {
if end_of_message > buf.len() { if end_of_message > buf.len() {
error!("index too long"); error!("index too long");
debug!("{buf:?}"); debug!("{buf:?}");
} } else {
payload = Some(buf[5..end_of_message].to_vec()); payload = Some(buf[5..end_of_message].to_vec());
} }
}
Self { Self {
message_length, message_length,

View File

@ -9,7 +9,7 @@ use crate::{
// External imports // External imports
use log::{ debug, error }; use log::{ debug, error };
use std::net::SocketAddr; use std::net::{SocketAddr, SocketAddrV4};
use tokio::{ use tokio::{
io::{ AsyncReadExt, AsyncWriteExt }, io::{ AsyncReadExt, AsyncWriteExt },
net::TcpStream net::TcpStream
@ -20,7 +20,7 @@ pub struct Peer {
/// The `TcpStream` that is used to communicate with the peeer /// The `TcpStream` that is used to communicate with the peeer
connection_stream: TcpStream, connection_stream: TcpStream,
/// The `SocketAddr` of the peer /// The `SocketAddr` of the peer
pub socket_addr: SocketAddr, pub socket_addr: SocketAddrV4,
/// The id of the peer /// The id of the peer
pub peer_id: String, pub peer_id: String,
/// Whether the peer is choking the client /// Whether the peer is choking the client
@ -33,29 +33,21 @@ impl Peer {
/// # Arguments /// # Arguments
/// ///
/// * `socket_address` - The socket address of the peer. /// * `socket_address` - The socket address of the peer.
pub async fn create_connection(socket_address: &str) -> Option<Self> { pub async fn create_connection(socket_address: SocketAddrV4) -> Option<Self> {
let socket_addr = match socket_address.parse::<SocketAddr>() { let connection_stream = match TcpStream::connect(socket_address).await {
Err(err) => {
error!("error parsing address {}, err: {}", socket_address, err);
return None;
}
Ok(addr) => { addr }
};
let connection_stream = match TcpStream::connect(socket_addr).await {
Err(err) => { Err(err) => {
error!("unable to connect to {}, err: {}", socket_address, err); error!("unable to connect to {}, err: {}", socket_address, err);
return None return None
}, },
Ok(stream) => { Ok(stream) => {
debug!("created tcpstream successfully to: {socket_addr}"); debug!("created tcpstream successfully to: {socket_address}");
stream stream
} }
}; };
Some(Self { Some(Self {
connection_stream, connection_stream,
socket_addr, socket_addr: socket_address,
peer_id: String::new(), peer_id: String::new(),
choking: true, choking: true,
}) })

View File

@ -1,8 +1,9 @@
use log::{debug, info, error}; use log::{debug, info, error, warn, trace};
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use tokio::{fs::File as TokioFile, io::AsyncReadExt}; use tokio::{fs::File as TokioFile, io::AsyncReadExt};
use std::net::{IpAddr, SocketAddrV4};
/// Represents a node in a DHT network. /// Represents a node in a DHT network.
#[derive(Clone, Debug, Deserialize, Serialize)] #[derive(Clone, Debug, Deserialize, Serialize)]
@ -72,51 +73,64 @@ impl Torrent {
/// ///
/// * `path` - The path to the `.torrent` file. /// * `path` - The path to the `.torrent` file.
pub async fn from_torrent_file(path: &str) -> Self { pub async fn from_torrent_file(path: &str) -> Self {
// Read .torrent File info!("");
let mut file = TokioFile::open(path).await.unwrap(); info!("--> Reading File <--");
let Ok(mut file) = TokioFile::open(path).await else {
error!("Unable to read file at {path}");
panic!("Unable to read file at {path}");
};
info!("Found\t\t > {path}");
let mut buf: Vec<u8> = Vec::new(); let mut buf: Vec<u8> = Vec::new();
let Ok(_) = file.read_to_end(&mut buf).await else {
error!("Error reading file > {path}");
panic!("Error reading file > {path}");
};
info!("Read\t\t > {path}");
match file.read_to_end(&mut buf).await { let Ok(torrent) = serde_bencode::from_bytes(&buf) else {
Err(err) => { error!("Error deserializing file > {path}");
error!("Error reading file till end: {err}"); panic!("Error deserializing file > {path}");
panic!("Error reading file till end: {err}"); };
} info!("Parsed\t > {path}");
Ok(_) => { info!("Succesfully read {path}") }
}
match serde_bencode::from_bytes(&buf) { torrent
Err(err) => {
error!("Error deserializing file: {err}");
panic!("Error deserializing file: {err}");
}
Ok(torrent) => { torrent }
}
} }
}
impl Torrent {
/// Logs info about the *.torrent file /// Logs info about the *.torrent file
pub fn log_useful_information(&self) { pub fn log_useful_information(&self) {
info!(" -> Torrent Information <- "); info!("");
info!("Name: {}", self.info.name); info!("--> Torrent Information <--");
info!("Tracker: {:?}", self.announce); info!("Name:\t\t{}", self.info.name);
info!("Tracker List: {:?}", self.announce_list); info!("Trackers");
info!("Info Hash: {:X?}", self.get_info_hash()); if let Some(trackers) = &self.announce_list {
info!("Length: {:?}", self.info.length); for tracker in trackers {
info!(" |> {}", tracker[0])
}
}
info!("InfoHash:\t{:X?}", self.get_info_hash());
info!("Length:\t{:?}", self.info.length);
info!("Files:"); info!("Files:");
let Some(mut files) = self.info.files.clone() else {
info!("./{}", self.info.name);
return
};
match &self.info.files { files.sort_by(|a, b| a.path.len().cmp(&b.path.len()) );
None => { info!("./");
info!("> {}", self.info.name);
}
Some(files) => {
for file in files { for file in files {
if file.path.len() == 1 {
info!(" |--> {:?}", file.path);
} else {
let mut path = String::new(); let mut path = String::new();
for section in &file.path { file.path.iter().for_each(|s| { path.push_str(s); path.push('/') } );
path.push_str(&format!("{section}/"));
}
path.pop(); path.pop();
info!("> {}: {}B", path, file.length)
} info!(" |--> {}: {}B", path, file.length)
} }
} }
} }
@ -149,40 +163,44 @@ impl Torrent {
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize]; let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash { if &result[..] == piece_hash {
info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),); info!("Piece {}/{} Correct!", index + 1, self.info.pieces.len() / 20);
true true
} else { } else {
error!("Piece {}/{} incorrect :(", index + 1, self.info.pieces.len() / 20);
debug!("{:?}", &result[..]); debug!("{:?}", &result[..]);
debug!("{:?}", piece_hash); debug!("{:?}", piece_hash);
debug!("{:?}", &result[..].len()); debug!("{:?}", &result[..].len());
debug!("{:?}", piece_hash.len()); debug!("{:?}", piece_hash.len());
debug!("{}", piece.len()); debug!("{}", piece.len());
error!("Piece downloaded incorrectly");
false false
} }
} }
pub fn get_total_length(&self) -> u64 { pub fn get_total_length(&self) -> u64 {
match self.info.length { if let Some(n) = self.info.length {
None => {}, return n as u64
Some(n) => { return n as u64 }
}; };
match &self.info.files { if let Some(files) = &self.info.files {
None => { 0 },
Some(files) => {
let mut n = 0; let mut n = 0;
for file in files { for file in files {
n += file.length; n += file.length;
}; };
n return n
} };
}
0
} }
pub fn get_tracker(&self) -> (&str, u16) { pub fn get_trackers(&self) -> Option<Vec<SocketAddrV4>> {
info!("");
info!("--> Locating Trackers <--");
let mut addresses = vec![];
// This is the current regex as I haven't implemented support for http trackers yet
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap(); let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap();
if let Some(url) = &self.announce { if let Some(url) = &self.announce {
@ -190,24 +208,42 @@ impl Torrent {
let hostname = captures.get(1).unwrap().as_str(); let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str(); let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap()); if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()))
}
}
}
} else { } else {
println!("URL does not match the expected pattern | {}", url); warn!("{url} does not match the expected url pattern");
} }
} }
for (i, url) in self.announce_list.as_ref().unwrap().iter().enumerate() { if let Some(urls) = &self.announce_list {
debug!("{:?}", url); for url in urls.iter() {
if let Some(captures) = re.captures(&url[i]) { if let Some(captures) = re.captures(&url[0]) {
let hostname = captures.get(1).unwrap().as_str(); let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str(); let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap()); if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()));
}
}
info!("Sucessfully found tracker {}", url[0]);
}
} else { } else {
println!("URL does not match the expected pattern | {}", url[i]); warn!("{} does not match the expected url pattern", url[0]);
}
} }
} }
("", 0) if addresses.len() > 0 {
Some(addresses)
} else {
None
}
} }
} }

1
src/tracker/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod tracker;

View File

@ -1,15 +1,17 @@
use std::{net::{SocketAddr, Ipv4Addr}, vec}; use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use log::{debug, error}; use log::{debug, error};
use crate::torrent::Torrent;
pub struct Tracker { pub struct Tracker {
/// A UdpSocket used for communication. /// A UdpSocket used for communication.
connection_stream: UdpSocket, connection_stream: UdpSocket,
/// The local socket address requests are made from /// The local socket address requests are made from
pub socket_addr: SocketAddr, listen_address: SocketAddr,
/// The remote socket address of the tracker. /// The remote socket address of the tracker.
pub remote_addr: SocketAddr remote_address: SocketAddr
} }
impl Tracker { impl Tracker {
@ -24,46 +26,30 @@ impl Tracker {
/// # Panics /// # Panics
/// ///
/// Panics if there is an error parsing the given address or creating the UDP socket. /// Panics if there is an error parsing the given address or creating the UDP socket.
pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self { pub async fn new(listen_address: SocketAddr, remote_address: SocketAddr) -> Result<Self, ()> {
let socket_addr = match socket_address.parse::<SocketAddr>() { let Ok(connection_stream) = UdpSocket::bind(listen_address).await else {
Err(err) => { error!("error binding to udpsocket {listen_address}");
error!("error parsing address {}, err: {}", socket_address, err); return Err(())
panic!("error parsing given address, {}", err);
}
Ok(addr) => { addr }
}; };
let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0]; debug!("bound udpsocket successfully to: {listen_address}");
let remote_addr = SocketAddr::new(remote_address, remote_port); match connection_stream.connect(remote_address).await {
let connection_stream = match UdpSocket::bind(socket_addr).await {
Err(err) => { Err(err) => {
error!("unable to bind to {}, err: {}", socket_address, err); error!("unable to connect to {}, err: {}", remote_address, err);
panic!("error creating udpsocket, {}", err);
},
Ok(stream) => {
debug!("bound udpsocket successfully to: {socket_addr}");
stream
}
};
match connection_stream.connect(remote_addr).await {
Err(err) => {
error!("unable to connect to {}, err: {}", remote_addr, err);
panic!("error creating udpsocket, {}", err); panic!("error creating udpsocket, {}", err);
}, },
Ok(()) => { Ok(()) => {
debug!("successfully connected to: {remote_addr}"); debug!("successfully connected to: {remote_address}");
} }
}; };
Self { Ok(Self {
connection_stream, connection_stream,
socket_addr, listen_address,
remote_addr remote_address
} })
} }
/// Sends a message to the tracker and receives a response asynchronously. /// Sends a message to the tracker and receives a response asynchronously.
@ -75,7 +61,7 @@ impl Tracker {
/// # Returns /// # Returns
/// ///
/// A byte vector containing the received response. /// A byte vector containing the received response.
pub async fn send_message<T: ToBuffer>(&mut self, message: &T) -> Vec<u8> { async fn send_message<T: ToBuffer>(&mut self, message: &T) -> Vec<u8> {
let mut buf: Vec<u8> = vec![ 0; 16_384 ]; let mut buf: Vec<u8> = vec![ 0; 16_384 ];
self.connection_stream.send(&message.to_buffer()).await.unwrap(); self.connection_stream.send(&message.to_buffer()).await.unwrap();
@ -83,6 +69,33 @@ impl Tracker {
buf buf
} }
async fn send_handshake(&mut self) -> i64 {
ConnectionMessage::from_buffer(
&self.send_message(&ConnectionMessage::create_basic_connection()).await
).connection_id
}
pub async fn find_peers(&mut self, torrent: &Torrent, peer_id: &str) -> Vec<SocketAddrV4> {
let id = self.send_handshake().await;
let message = AnnounceMessage::new(
id,
&torrent.get_info_hash(),
peer_id,
torrent.get_total_length() as i64
);
let announce_message_response = AnnounceMessageResponse::from_buffer(&self.send_message(&message).await);
let mut peer_addresses = vec![];
for i in 0..announce_message_response.ips.len() {
peer_addresses.push(SocketAddrV4::new(announce_message_response.ips[i], announce_message_response.ports[i]))
}
peer_addresses
}
} }
/// A trait for converting a type into a byte buffer. /// A trait for converting a type into a byte buffer.