Initial commit

This commit is contained in:
Arlo Filley 2023-08-27 13:21:53 +01:00
commit 4a43048d3b
12 changed files with 1461 additions and 0 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
/target
Cargo.lock
/log/rusty_torrent.log
/testing
/process
/log

5
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,5 @@
{
"rust-analyzer.linkedProjects": [
"./Cargo.toml"
]
}

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "rusty_torrent"
version = "0.4.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
dns-lookup = "2.0.2"
encoding_rs = "0.8.33"
log = "0.4.20"
log4rs = "1.2.0"
regex = "1.9.4"
reqwest = "0.11.20"
serde = { version = "1.0.183", features = ["derive"] }
serde_bencode = "0.2.3"
serde_bytes = "0.11.12"
sha1 = "0.10.5"
simple-logging = "2.0.2"
tokio = { version = "1.30.0", features = ["full"] }

93
readme.md Normal file
View File

@ -0,0 +1,93 @@
# Rusty_Torrent BitTorrent Client
![GitHub](https://img.shields.io/github/license/ArloFilley/rusty_torrent)
![GitHub last commit](https://img.shields.io/github/last-commit/ArloFilley/rusty_torrent)
![GitHub stars](https://img.shields.io/github/stars/ArloFilley/rusty_torrent?style=social)
A BitTorrent client implemented in Rust that allows you to interact with the BitTorrent protocol and download torrents.
## Table of Contents
- [Introduction](#introduction)
- [Features](#features)
- [Getting Started](#getting-started)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Usage](#usage)
- [How It Works](#how-it-works)
- [Contributing](#contributing)
- [License](#license)
## Introduction
This BitTorrent client is designed to provide a simple and functional implementation of the BitTorrent protocol. It supports downloading torrents and interacting with peers to exchange pieces of files.
## Features
- Handshake and communication with peers using the BitTorrent protocol.
- Support for downloading torrents in both single-file and multi-file mode.
- Ability to request and download individual pieces from peers.
- Piece verification using SHA-1 hashes to ensure data integrity.
- Logging using the `log` crate for better debugging and tracing.
## Getting Started
### Prerequisites
- Rust programming language: [Install Rust](https://www.rust-lang.org/tools/install)
- Cargo: The Rust package manager, usually installed with Rust.
### Installation
1. Clone the repository:
```bash
git clone https://github.com/ArloFilley/rusty_torrent.git
```
2. Navigate to the project directory:
```bash
cd rusty_torrent
```
3. Build the project
```bash
cargo build --release
```
### Usage
To use the BitTorrent client, follow these steps:
1. Run the compiled binary:
```bash
cargo run --release
```
2. Provide the path to a .torrent file:
```bash
cargo run --release /path/to/your.torrent
```
3. Provide the path to download
```bash
cargo run --release /path/to/your.torrent /path/to/downloads
```
The client will start downloading the torrent files and interacting with peers.
## How It Works
This BitTorrent client uses Rust's asynchronous programming features to manage connections with peers and perform file downloads. It employs the BitTorrent protocol's handshake and communication mechanisms to exchange pieces of data with other peers in the network. The client also verifies downloaded pieces using SHA-1 hashes provided by the torrent file.
## Contributing
Contributions are welcome! If you find any bugs or want to add new features, please feel free to open issues and pull requests on the GitHub repository.
## License
This project is licensed under the MIT License.

108
src/files.rs Normal file
View File

@ -0,0 +1,108 @@
use log::debug;
use tokio::{
fs::try_exists as dir_exists,
fs::create_dir as create_dir,
fs::File,
io::AsyncWriteExt
};
use crate::torrent::Torrent;
/// Represents information about a file being downloaded.
#[derive(Debug)]
struct FileInfo {
file: File,
length: u64,
current_length: u64,
name: String,
complete: bool
}
/// Represents a collection of files being downloaded.
#[derive(Debug)]
pub struct Files(Vec<FileInfo>);
impl Files {
/// Creates a new `Files` instance.
pub fn new() -> Self {
Self(vec![])
}
/// Creates the files in the local system for downloading.
///
/// # Arguments
///
/// * `torrent` - The `Torrent` instance describing the torrent.
/// * `download_path` - The path where the files will be downloaded.
pub async fn create_files(&mut self, torrent: &Torrent, download_path: &str) {
match &torrent.info.files {
// Single File Mode
None => {
let path = &format!("{download_path}/{}", torrent.info.name);
let file = File::create(&path).await.unwrap();
let length = torrent.info.length.unwrap_or(0) as u64;
self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false })
}
// Multi File Mode
Some(files) => {
for t_file in files {
let mut path = download_path.to_string();
for dir in &t_file.path[..t_file.path.len() - 1] {
path.push('/');
path.push_str(&dir);
if !dir_exists(&path).await.unwrap() {
debug!("Creating: {path}");
create_dir(&path).await.unwrap();
}
}
path.push('/');
path.push_str(&t_file.path[t_file.path.len() - 1]);
debug!("Creating: {path}");
let file = File::create(&path).await.unwrap();
let length = t_file.length;
self.0.push(FileInfo { file, length, current_length: 0, name: path.to_string(), complete: false });
}
}
}
}
/// Writes a piece of data to the appropriate files.
///
/// # Arguments
///
/// * `piece` - The piece of data to write.
pub async fn write_piece(&mut self, piece: Vec<u8>) {
let mut j = 0;
let mut piece_len = piece.len() as u64;
let file_iterator = self.0.iter_mut();
for file in file_iterator {
if file.complete { continue }
if file.current_length + piece_len > file.length {
let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap();
debug!("Wrote {n}B > {}", file.name);
j = (file.length - file.current_length) as usize;
file.current_length += j as u64;
piece_len -= j as u64;
file.complete = true;
} else {
let n = file.file.write(&piece[j..]).await.unwrap();
debug!("Wrote {n}B > {}", file.name);
file.current_length += piece_len;
return
}
}
}
}

119
src/handshake.rs Normal file
View File

@ -0,0 +1,119 @@
use log::{debug, error};
/// Represents the handshake message that will be sent to a client.
#[derive(Debug)]
pub struct Handshake {
/// The length of the protocol name, must be 19 for "BitTorrent protocol".
p_str_len: u8,
/// The protocol name, should always be "BitTorrent protocol".
p_str: String,
/// Reserved for extensions, currently unused.
reserved: [u8; 8],
/// The infohash for the torrent.
info_hash: Vec<u8>,
/// The identifier for the client.
pub peer_id: String,
}
impl Handshake {
/// Creates a new handshake.
///
/// # Arguments
///
/// * `info_hash` - The infohash for the torrent.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
pub fn new(info_hash: Vec<u8>) -> Result<Self, ()> {
if info_hash.len() != 20 {
error!("Incorrect infohash length, consider using the helper function in torrent");
return Err(());
}
Ok(Self {
p_str_len: 19,
p_str: String::from("BitTorrent protocol"),
reserved: [0; 8],
info_hash,
peer_id: String::from("-MY0001-123456654322")
})
}
/// Converts the `Handshake` instance to a byte buffer for sending to a peer.
///
/// # Returns
///
/// A byte vector containing the serialized handshake.
pub fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![0; 68];
buf[0] = self.p_str_len;
for i in 1..20 {
buf[i] = self.p_str.as_bytes()[i - 1];
}
for i in 21..28 {
buf[i] = self.reserved[i - 21]
}
for i in 28..48 {
buf[i] = self.info_hash[i - 28]
}
for i in 48..68 {
buf[i] = self.peer_id.as_bytes()[i - 48]
}
buf
}
/// Converts a byte buffer to a `Handshake` instance.
///
/// # Arguments
///
/// * `buf` - A byte vector containing the serialized handshake.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
///
/// # Errors
///
/// Returns an error if the provided buffer is not long enough (at least 68 bytes).
pub fn from_buffer(buf: &Vec<u8>) -> Result<Self, ()> {
// Verify that buffer is at least the correct size, if not error
if buf.len() < 68 {
error!("buffer provided to handshake was too short");
return Err(());
}
let mut p_str = String::new();
for i in 1..20 {
p_str.push(buf[i] as char)
}
let mut info_hash: Vec<u8> = vec![0; 20];
for i in 28..48 {
info_hash[i - 28] = buf[i];
}
let mut peer_id = String::new();
for i in 48..68 {
peer_id.push(buf[i] as char)
}
Ok(Self {
p_str_len: buf[0],
p_str,
reserved: [0; 8],
info_hash,
peer_id
})
}
pub fn log_useful_information(&self) {
debug!("Connected - PeerId: {:?}", self.peer_id);
}
}

124
src/main.rs Normal file
View File

@ -0,0 +1,124 @@
//! The root of the crate
//!
//! Currently:
//! Creates the logger
//! Handles peer connection
//! Handles torrent information
//! Creates Files
//! Requests pieces
//! Checks piece hashes
//! Writes to torrent file
// Modules
mod files;
mod handshake;
mod peer;
mod message;
mod torrent;
mod tracker;
// Crate Imports
use crate::{
files::Files,
peer::Peer,
torrent::Torrent,
tracker::{
AnnounceMessage, AnnounceMessageResponse,
ConnectionMessage,
FromBuffer,
Tracker
}
};
// External Ipmorts
use log::{ debug, info, LevelFilter, error };
/// The root function
#[tokio::main]
async fn main() {
// Creates a log file to handle large amounts of data
let log_path = "/Users/arlo/code/RustyTorrent/log/rustytorrent.log";
simple_logging::log_to_file(log_path, LevelFilter::Info).unwrap();
let torrent_file_path = match std::env::args().nth(1) {
None => {
error!("Please Provide A Torrent File");
eprintln!("Critical error, please read log for more info: {}", log_path);
return
}
Some(path) => { path }
};
let download_path = match std::env::args().nth(2) {
None => {
error!("Please Provide A Download Path");
eprintln!("Critical error, please read log for more info: {}", log_path);
return
}
Some(path) => { path }
};
// Read the Torrent File
let torrent = Torrent::from_torrent_file(&torrent_file_path).await;
info!("Sucessfully read torrent file");
torrent.log_useful_information();
// Create the files that will be written to
let mut files = Files::new();
files.create_files(&torrent, &download_path).await;
// Gets peers from the given tracker
let (_remote_hostname, _remote_port) = torrent.get_tracker();
let (remote_hostname, remote_port) = ("tracker.opentrackr.org", 1337);
debug!("{}:{}", remote_hostname, remote_port);
let mut tracker = Tracker::new("0.0.0.0:61389", remote_hostname, remote_port).await;
info!("Successfully connected to tracker {}:{}", remote_hostname, remote_port);
let connection_message = ConnectionMessage::from_buffer(
&tracker.send_message(&ConnectionMessage::create_basic_connection()).await
);
debug!("{:?}", connection_message);
let announce_message_response = AnnounceMessageResponse::from_buffer(
&tracker.send_message(&AnnounceMessage::new(
connection_message.connection_id,
&torrent.get_info_hash(),
"-MY0001-123456654321",
torrent.get_total_length() as i64
)).await
);
debug!("{:?}", announce_message_response);
info!("Found Peers");
// Creates an assumed peer connection to the `SocketAddr` given
let mut peer = match Peer::create_connection(&format!("{}:{}", announce_message_response.ips[4], announce_message_response.ports[4])).await {
None => { return },
Some(peer) => peer
};
let num_pieces = torrent.info.pieces.len() / 20;
peer.handshake(&torrent).await;
peer.keep_alive_until_unchoke().await;
info!("Successfully Created Connection with peer: {}", peer.peer_id);
let mut len = 0;
for index in 0..num_pieces {
let piece= peer.request_piece(
index as u32, torrent.info.piece_length as u32,
&mut len, torrent.get_total_length() as u32
).await;
if torrent.check_piece(&piece, index as u32) {
files.write_piece(piece).await;
} else {
break
}
}
peer.disconnect().await;
info!("Successfully completed download");
}

237
src/message.rs Normal file
View File

@ -0,0 +1,237 @@
use log::error;
use std::vec;
/// Represents a message in the BitTorrent protocol.
#[derive(Debug, PartialEq)]
pub struct Message {
/// The length of the message, including the type and payload.
pub message_length: u32,
/// The type of message.
pub message_type: MessageType,
/// The payload of the message, if any.
pub payload: Option<Vec<u8>>,
}
impl Message {
/// Creates a new message.
///
/// # Arguments
///
/// * `message_length` - The length of the message.
/// * `message_type` - The type of message.
/// * `payload` - The payload of the message, if any.
pub fn new(message_length: u32, message_type: MessageType, payload: Option<Vec<u8>>) -> Self {
Self { message_length, message_type, payload }
}
/// Decodes a message from a given buffer.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing the serialized message.
///
/// # Returns
///
/// A new `Message` instance on success, or an empty `Result` indicating an error.
pub fn from_buffer(buf: &Vec<u8>) -> Result<Self, ()> {
let mut message_length: [u8; 4] = [0; 4];
for i in 0..4 {
message_length[i] = buf[i];
};
let message_length = u32::from_be_bytes(message_length);
let payload: Option<Vec<u8>>;
let message_type: MessageType;
if message_length == 0 {
message_type = MessageType::KeepAlive;
payload = None;
} else {
message_type = match buf[4] {
0 => MessageType::Choke,
1 => MessageType::Unchoke,
2 => MessageType::Interested,
3 => MessageType::NotInterested,
4 => MessageType::Have,
5 => MessageType::Bitfield,
6 => MessageType::Request,
7 => MessageType::Piece,
8 => MessageType::Cancel,
9 => MessageType::Port,
_ => {
error!("Invalid Message Type: {} | Message: {:?}", buf[4], buf);
return Err(())
}
};
// if message_type == MessageType::Piece && 5 + message_length - 1 != 16397 {
// error!("{:?}", 5..5 + message_length as usize - 1);
// }
payload = Some(buf[5..5 + message_length as usize - 1].to_vec());
}
Ok(Self {
message_length,
message_type,
payload
})
}
/// Converts the `Message` instance to a byte buffer for sending.
///
/// # Returns
///
/// A byte vector containing the serialized message.
pub fn to_buffer(&mut self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
for byte in self.message_length.to_be_bytes() {
buf.push(byte);
}
match self.message_type {
MessageType::KeepAlive => {
return buf
},
MessageType::Choke => {
buf.push(0);
return buf;
},
MessageType::Unchoke => {
buf.push(1);
return buf;
},
MessageType::Interested => {
buf.push(2);
return buf;
},
MessageType::NotInterested => {
buf.push(3);
return buf;
},
MessageType::Have => {
buf.push(4);
},
MessageType::Bitfield => {
buf.push(5);
},
MessageType::Request => {
buf.push(6);
},
MessageType::Piece => {
buf.push(7);
},
MessageType::Cancel => {
buf.push(8);
},
MessageType::Port => {
buf.push(9);
},
}
match &self.payload {
None => { panic!("Error you are trying to create a message that needs a payload with no payload") }
Some(payload) => {
for byte in payload {
buf.push(*byte)
}
buf
}
}
}
/// Create a request message from a given piece_index, offset, and length
///
/// # Arguments
///
/// * `piece_index` - The index of the piece in the torrent
/// * `offset` - The offset within the piece, because requests should be no more than 16KiB
/// * `length` - The length of the piece request, should be 16KiB
///
/// # Returns
///
/// A piece request message
pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self {
let mut payload: Vec<u8> = vec![];
for byte in piece_index.to_be_bytes() {
payload.push(byte);
}
for byte in offset.to_be_bytes() {
payload.push(byte)
}
for byte in length.to_be_bytes() {
payload.push(byte)
}
Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) }
}
/// Returns the number of messages in the given buffer and their contents.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing multiple serialized messages.
///
/// # Returns
///
/// A tuple containing a vector of message byte buffers and the number of messages.
pub fn number_of_messages(buf: &Vec<u8>) -> (Vec<Vec<u8>>, u32) {
let mut message_num = 0;
let mut messages: Vec<Vec<u8>> = vec![];
// Find the length of message one
// put that into an array and increment counter by one
let mut i = 0; // points to the front
let mut j; // points to the back
loop {
j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4;
messages.push(buf[i..i+j].to_vec());
i = i+j;
message_num += 1;
if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 {
break;
}
}
(messages, message_num)
}
}
/// An enum representing all possible message types in the BitTorrent peer wire protocol.
#[derive(Clone, Debug, PartialEq)]
#[repr(u8)]
pub enum MessageType {
/// Keepalive message, 0 length.
/// Potential Errors if trying to handle a keepalive message like another message.
/// Due to length being 0, should always be explicitly handled.
KeepAlive = u8::MAX,
/// Message telling the client not to send any requests until the peer has unchoked, 1 length.
Choke = 0,
/// Message telling the client that it can send requests, 1 length.
Unchoke = 1,
/// Message indicating that the peer is still interested in downloading, 1 length.
Interested = 2,
/// Message indicating that the peer is not interested in downloading, 1 length.
NotInterested = 3,
/// Message indicating that the peer has a given piece, fixed length.
Have = 4,
/// Message sent after a handshake, represents the pieces that the peer has.
Bitfield = 5,
/// Request a given part of a piece based on index, offset, and length, 13 length.
Request = 6,
/// A response to a request with the accompanying data, varying length.
Piece = 7,
/// Cancels a request, 13 length.
Cancel = 8,
/// Placeholder for unimplemented message type.
Port = 9,
}

238
src/peer.rs Normal file
View File

@ -0,0 +1,238 @@
//! Contains Structures and associated methods to abstract interaction with the peer
// Crate Imports
use crate::{
handshake::Handshake,
message::{ Message, MessageType },
torrent::Torrent
};
// External imports
use log::{ debug, error };
use std::net::SocketAddr;
use tokio::{
io::{ AsyncReadExt, AsyncWriteExt },
net::TcpStream
};
/// Structure to abstract interaction with a peer.
pub struct Peer {
/// The `TcpStream` that is used to communicate with the peeer
connection_stream: TcpStream,
/// The `SocketAddr` of the peer
pub socket_addr: SocketAddr,
/// The id of the peer
pub peer_id: String,
/// Whether the peer is choking the client
pub choking: bool,
}
impl Peer {
/// Creates a connection to the peer.
///
/// # Arguments
///
/// * `socket_address` - The socket address of the peer.
pub async fn create_connection(socket_address: &str) -> Option<Self> {
let socket_addr = match socket_address.parse::<SocketAddr>() {
Err(err) => {
error!("error parsing address {}, err: {}", socket_address, err);
return None;
}
Ok(addr) => { addr }
};
let connection_stream = match TcpStream::connect(socket_addr).await {
Err(err) => {
error!("unable to connect to {}, err: {}", socket_address, err);
return None
},
Ok(stream) => {
debug!("created tcpstream successfully to: {socket_addr}");
stream
}
};
Some(Self {
connection_stream,
socket_addr,
peer_id: String::new(),
choking: true,
})
}
/// Sends a handshake message to the peer, the first step in the peer wire messaging protocol.
///
/// # Arguments
///
/// * `torrent` - The `Torrent` instance associated with the peer.
pub async fn handshake(&mut self, torrent: &Torrent) {
let mut buf = vec![0; 1024];
let handshake_message = Handshake::new(torrent.get_info_hash()).unwrap();
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read(&mut buf).await.unwrap();
let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap();
handshake.log_useful_information();
for message_buf in Message::number_of_messages(&buf[68..].to_vec()).0 {
let message = match Message::from_buffer(&message_buf) {
Err(()) => {
error!("error decoding message");
self.disconnect().await;
return;
},
Ok(message) => { message }
};
match message.message_type {
MessageType::Unchoke => {
self.choking = false;
}
_ => {}
}
}
self.peer_id = handshake.peer_id;
}
/// Keeps the connection alive and sends interested messages until the peer unchokes
pub async fn keep_alive_until_unchoke(&mut self) {
loop {
let message = match self.read_message().await {
None => return,
Some(message) => message
};
debug!("{message:?}");
match message.message_type {
MessageType::Unchoke => {
self.choking = false;
break
}
MessageType::KeepAlive => {
self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await;
self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await;
}
MessageType::Choke => {
self.choking = true;
}
_ => { continue }
}
}
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message(&mut self, mut message: Message) -> Message {
let mut buf = vec![0; 16_397];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf).unwrap()
}
/// Sends a message to the peer and waits for a response, which it returns
pub async fn send_message_exact_size_response(&mut self, mut message: Message, size: usize) -> Message {
let mut buf = vec![0; size];
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap();
Message::from_buffer(&buf).unwrap()
}
/// Sends a message but doesn't wait for a response
pub async fn send_message_no_response(&mut self, mut message: Message) {
self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap();
}
/// reads a message from the peer
pub async fn read_message(&mut self) -> Option<Message> {
let mut buf = vec![0; 16_397];
self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read(&mut buf).await.unwrap();
match Message::from_buffer(&buf) {
Err(()) => {
error!("Unable to decode message");
self.disconnect().await;
return None;
},
Ok(message) => {
Some(message)
}
}
}
/// Shutsdown the connection stream
pub async fn disconnect(&mut self) {
match self.connection_stream.shutdown().await {
Err(err) => {
error!("Error disconnecting from {}: {}", self.socket_addr, err);
panic!("Error disconnecting from {}: {}", self.socket_addr, err);
},
Ok(_) => {
debug!("Successfully disconnected from {}", self.socket_addr)
}
}
}
}
impl Peer {
// Sends the requests and reads responses to put a piece together
pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec<u8> {
let mut buf = vec![];
// Sequentially requests piece from the peer
for offset in (0..piece_length).step_by(16_384 as usize) {
let mut length = 16_384;
let response: Message;
if *len + 16_384 >= total_len {
debug!("Final Request {}", total_len - *len);
length = total_len - *len;
response = self.send_message_exact_size_response(
Message::create_request(index, offset, length),
length as usize + 13
).await;
} else {
response = self.send_message(Message::create_request(index, offset, length)).await;
};
match response.message_type {
MessageType::Piece => {
let data = response.payload.unwrap();
*len += data.len() as u32;
*len -= 8;
for i in 8..data.len() {
buf.push(data[i])
}
},
_ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); }
};
if *len >= total_len - 1 {
return buf;
}
}
buf
}
}

214
src/torrent.rs Normal file
View File

@ -0,0 +1,214 @@
use log::{debug, info, error};
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use tokio::{fs::File as TokioFile, io::AsyncReadExt};
/// Represents a node in a DHT network.
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Node(String, i64);
/// Represents a file described in a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct File {
pub path: Vec<String>,
pub length: u64,
#[serde(default)]
md5sum: Option<String>,
}
/// Represents the metadata of a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Info {
pub name: String,
#[serde(with = "serde_bytes")]
pub pieces: Vec<u8>,
#[serde(rename = "piece length")]
pub piece_length: u64,
#[serde(default)]
md5sum: Option<String>,
#[serde(default)]
pub length: Option<i64>,
#[serde(default)]
pub files: Option<Vec<File>>,
#[serde(default)]
private: Option<u8>,
#[serde(default)]
path: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "root hash")]
root_hash: Option<String>,
}
/// Represents a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Torrent {
pub info: Info,
#[serde(default)]
pub announce: Option<String>,
#[serde(default)]
nodes: Option<Vec<Node>>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
httpseeds: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "announce-list")]
announce_list: Option<Vec<Vec<String>>>,
#[serde(default)]
#[serde(rename = "creation date")]
creation_date: Option<i64>,
#[serde(rename = "comment")]
comment: Option<String>,
#[serde(default)]
#[serde(rename = "created by")]
created_by: Option<String>
}
impl Torrent {
/// Reads a `.torrent` file and converts it into a `Torrent` struct.
///
/// # Arguments
///
/// * `path` - The path to the `.torrent` file.
pub async fn from_torrent_file(path: &str) -> Self {
// Read .torrent File
let mut file = TokioFile::open(path).await.unwrap();
let mut buf: Vec<u8> = Vec::new();
match file.read_to_end(&mut buf).await {
Err(err) => {
error!("Error reading file till end: {err}");
panic!("Error reading file till end: {err}");
}
Ok(_) => { info!("Succesfully read {path}") }
}
match serde_bencode::from_bytes(&buf) {
Err(err) => {
error!("Error deserializing file: {err}");
panic!("Error deserializing file: {err}");
}
Ok(torrent) => { torrent }
}
}
/// Logs info about the *.torrent file
pub fn log_useful_information(&self) {
info!(" -> Torrent Information <- ");
info!("Name: {}", self.info.name);
info!("Tracker: {:?}", self.announce);
info!("Tracker List: {:?}", self.announce_list);
info!("Info Hash: {:X?}", self.get_info_hash());
info!("Length: {:?}", self.info.length);
info!("Files:");
match &self.info.files {
None => {
info!("> {}", self.info.name);
return
}
Some(files) => {
for file in files {
let mut path = String::new();
for section in &file.path {
path.push_str(&format!("{section}/"));
}
path.pop();
info!("> {}: {}B", path, file.length)
}
}
}
}
/// Calculates the info hash of the torrent.
pub fn get_info_hash(&self) -> Vec<u8> {
let buf = serde_bencode::to_bytes(&self.info).unwrap();
let mut hasher = Sha1::new();
hasher.update(buf);
let res = hasher.finalize();
res[..].to_vec()
}
/// Checks if a downloaded piece matches its hash.
///
/// # Arguments
///
/// * `piece` - The downloaded piece.
/// * `index` - The index of the piece.
///
/// # Returns
///
/// * `true` if the piece is correct, `false` otherwise.
pub fn check_piece(&self, piece: &Vec<u8>, index: u32) -> bool {
let mut hasher = Sha1::new();
hasher.update(&piece);
let result = hasher.finalize();
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash {
info!("Downloaded Piece {}/{} Correct!, Piece Was: {}B long", index + 1, self.info.pieces.len() / 20, piece.len(),);
true
} else {
debug!("{:?}", &result[..]);
debug!("{:?}", piece_hash);
debug!("{:?}", &result[..].len());
debug!("{:?}", piece_hash.len());
debug!("{}", piece.len());
error!("Piece downloaded incorrectly");
false
}
}
pub fn get_total_length(&self) -> u64 {
match self.info.length {
None => {},
Some(n) => { return n as u64 }
};
match &self.info.files {
None => { 0 },
Some(files) => {
let mut n = 0;
for file in files {
n += file.length;
};
n
}
}
}
pub fn get_tracker(&self) -> (&str, u16) {
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap();
if let Some(url) = &self.announce {
if let Some(captures) = re.captures(url) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap());
} else {
println!("URL does not match the expected pattern | {}", url);
}
}
for (i, url) in self.announce_list.as_ref().unwrap().into_iter().enumerate() {
debug!("{:?}", url);
if let Some(captures) = re.captures(&url[i]) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
return (hostname, port.parse().unwrap());
} else {
println!("URL does not match the expected pattern | {}", url[i]);
}
}
("", 0)
}
}

297
src/tracker.rs Normal file
View File

@ -0,0 +1,297 @@
use std::{net::{SocketAddr, Ipv4Addr}, vec};
use tokio::net::UdpSocket;
use log::{debug, error};
pub struct Tracker {
/// A UdpSocket used for communication.
connection_stream: UdpSocket,
/// The local socket address requests are made from
pub socket_addr: SocketAddr,
/// The remote socket address of the tracker.
pub remote_addr: SocketAddr
}
impl Tracker {
/// Creates a new `Tracker` instance asynchronously.
///
/// # Arguments
///
/// * `socket_address` - Local socket address for binding.
/// * `remote_hostname` - Remote host for connection.
/// * `remote_port` - Remote port for connection.
///
/// # Panics
///
/// Panics if there is an error parsing the given address or creating the UDP socket.
pub async fn new(socket_address: &str, remote_hostname: &str, remote_port: u16) -> Self {
let socket_addr = match socket_address.parse::<SocketAddr>() {
Err(err) => {
error!("error parsing address {}, err: {}", socket_address, err);
panic!("error parsing given address, {}", err);
}
Ok(addr) => { addr }
};
let remote_address = dns_lookup::lookup_host(remote_hostname).unwrap()[0];
let remote_addr = SocketAddr::new(remote_address, remote_port);
let connection_stream = match UdpSocket::bind(socket_addr).await {
Err(err) => {
error!("unable to bind to {}, err: {}", socket_address, err);
panic!("error creating udpsocket, {}", err);
},
Ok(stream) => {
debug!("bound udpsocket successfully to: {socket_addr}");
stream
}
};
match connection_stream.connect(remote_addr).await {
Err(err) => {
error!("unable to connect to {}, err: {}", remote_addr, err);
panic!("error creating udpsocket, {}", err);
},
Ok(()) => {
debug!("successfully connected to: {remote_addr}");
}
};
Self {
connection_stream,
socket_addr,
remote_addr
}
}
/// Sends a message to the tracker and receives a response asynchronously.
///
/// # Arguments
///
/// * `message` - A type that implements the `ToBuffer` trait, representing the message to send.
///
/// # Returns
///
/// A byte vector containing the received response.
pub async fn send_message<T: ToBuffer>(&mut self, message: &T) -> Vec<u8> {
let mut buf: Vec<u8> = vec![ 0; 16_384 ];
self.connection_stream.send(&message.to_buffer()).await.unwrap();
self.connection_stream.recv(&mut buf).await.unwrap();
buf
}
}
/// A trait for converting a type into a byte buffer.
pub trait ToBuffer {
/// Converts the implementing type into a byte buffer.
fn to_buffer(&self) -> Vec<u8>;
}
/// A trait for converting a type from a byte buffer.
pub trait FromBuffer {
/// Converts a byte buffer into the implementing type.
fn from_buffer(buf: &Vec<u8>) -> Self;
}
#[derive(Debug)]
/// Represents a basic connection message.
pub struct ConnectionMessage {
pub connection_id: i64,
action: i32,
transaction_id: i32,
}
impl ConnectionMessage {
/// Creates a new basic connection message
pub fn create_basic_connection() -> Self {
Self {
connection_id: 4497486125440,
action: 0,
transaction_id: 123
}
}
}
impl ToBuffer for ConnectionMessage {
fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
buf.extend(self.connection_id.to_be_bytes());
buf.extend(self.action.to_be_bytes());
buf.extend(self.transaction_id.to_be_bytes());
buf
}
}
impl FromBuffer for ConnectionMessage {
fn from_buffer(buf: &Vec<u8>) -> Self {
let mut action: [u8; 4] = [0; 4];
for i in 0..4 {
action[i] = buf[i];
}
let action = i32::from_be_bytes(action);
let mut transaction_id: [u8; 4] = [0; 4];
for i in 4..8 {
transaction_id[i - 4] = buf[i];
}
let transaction_id = i32::from_be_bytes(transaction_id);
let mut connection_id: [u8; 8] = [0; 8];
for i in 8..16 {
connection_id[i - 8] = buf[i];
}
let connection_id = i64::from_be_bytes(connection_id);
Self {
connection_id,
action,
transaction_id
}
}
}
#[derive(Debug)]
/// Represents an announcement message in the bittorrent udp tracker protocol.
pub struct AnnounceMessage {
connection_id: i64,
action: i32,
transaction_id: i32,
info_hash: [u8; 20],
peer_id: [u8; 20],
downloaded: i64,
left: i64,
uploaded: i64,
event: i32,
ip: u32,
key: u32,
num_want: i32,
port: u16,
extensions: u16
}
impl AnnounceMessage {
/// Creates a new announce message.
pub fn new(connection_id: i64, infohash: &Vec<u8>, peerid: &str, total_length: i64) -> Self {
let mut info_hash: [u8; 20] = [0; 20];
for i in 0..20 {
info_hash[i] = infohash[i];
}
let mut peer_id: [u8; 20] = [0; 20];
for i in 0..20 {
peer_id[i] = peerid.chars().nth(i).unwrap() as u8;
}
Self {
connection_id,
action: 1,
transaction_id: 132,
info_hash,
peer_id,
downloaded: 0,
left: total_length,
uploaded: 0,
event: 1,
ip: 0,
key: 234,
num_want: -1,
port: 61389,
extensions: 0
}
}
}
impl ToBuffer for AnnounceMessage {
fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
buf.extend(self.connection_id.to_be_bytes());
buf.extend(self.action.to_be_bytes());
buf.extend(self.transaction_id.to_be_bytes());
buf.extend(self.info_hash);
buf.extend(self.peer_id);
buf.extend(self.downloaded.to_be_bytes());
buf.extend(self.left.to_be_bytes());
buf.extend(self.uploaded.to_be_bytes());
buf.extend(self.event.to_be_bytes());
buf.extend(self.ip.to_be_bytes());
buf.extend(self.key.to_be_bytes());
buf.extend(self.num_want.to_be_bytes());
buf.extend(self.port.to_be_bytes());
buf.extend(self.extensions.to_be_bytes());
buf
}
}
#[derive(Debug)]
/// Represents a response to an announcement message.
pub struct AnnounceMessageResponse {
pub action: i32,
pub transaction_id: i32,
pub interval: i32,
pub leechers: i32,
pub seeders: i32,
pub ips: Vec<Ipv4Addr>,
pub ports: Vec<u16>
}
impl FromBuffer for AnnounceMessageResponse {
/// Converts a byte buffer into an `AnnounceMessageResponse` instance.
fn from_buffer(buf: &Vec<u8>) -> Self {
let mut action: [u8; 4] = [0; 4];
for i in 0..4 {
action[i] = buf[i];
}
let action = i32::from_be_bytes(action);
let mut transaction_id: [u8; 4] = [0; 4];
for i in 4..8 {
transaction_id[i - 4] = buf[i];
}
let transaction_id = i32::from_be_bytes(transaction_id);
let mut interval: [u8; 4] = [0; 4];
for i in 8..12 {
interval[i - 8] = buf[i];
}
let interval = i32::from_be_bytes(interval);
let mut leechers: [u8; 4] = [0; 4];
for i in 12..16 {
leechers[i - 12] = buf[i];
}
let leechers = i32::from_be_bytes(leechers);
let mut seeders: [u8; 4] = [0; 4];
for i in 16..20 {
seeders[i - 16] = buf[i];
}
let seeders = i32::from_be_bytes(seeders);
let mut ips: Vec<Ipv4Addr> = vec![];
let mut ports: Vec<u16> = vec![];
for i in (20..buf.len()-6).step_by(6) {
let ip = Ipv4Addr::new(buf[i], buf[i+1], buf[i+2], buf[i+3]);
let port = u16::from_be_bytes([buf[i+4], buf[i+5]]);
if ip.to_string() == "0.0.0.0" && port == 0 {
break;
}
ips.push(ip);
ports.push(port)
}
Self { action, transaction_id, interval, leechers, seeders, ips: ips[1..].to_vec(), ports: ports[1..].to_vec() }
}
}