Compare commits

..

2 Commits

Author SHA1 Message Date
6963cef189 added readme 2023-11-24 13:37:44 +00:00
a05027895d created library 2023-11-24 13:25:32 +00:00
21 changed files with 1198 additions and 793 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

7
.gitignore vendored
View File

@ -1,8 +1,3 @@
/target target/
Cargo.lock Cargo.lock
/log/rusty_torrent.log
/testing
/process
/log
.DS_Store .DS_Store
/downloads

View File

@ -1,28 +1,5 @@
[package] [workspace]
name = "rusty_torrent" members = ["lib_rusty_torrent", "rusty_torrenter"]
version = "0.9.3"
edition = "2021"
description = "A BitTorrent client implemented in Rust that allows you to interact with the BitTorrent protocol and download torrents." [workspace.dependencies]
authors = ["Arlo Filley <filleyarlo@gmail.com>"]
exclude = ["testing/", "process/", ".vscode/", ".DS_STORE"]
license = "MIT"
keywords = ["bittorrent", "torrent", "torrentclient"]
readme = "README.md"
repository = "https://github.com/arlofilley/rusty_torrent"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = { version = "4.4.0", features = ["derive"] }
dns-lookup = "2.0.2"
log = "0.4.20"
regex = "1.9.4"
reqwest = "0.11.20"
serde = { version = "1.0.183", features = ["derive"] }
serde_bencode = "0.2.3"
serde_bytes = "0.11.12"
sha1 = "0.10.5"
simple-logging = "2.0.2"
tokio = { version = "1.30.0", features = ["full"] } tokio = { version = "1.30.0", features = ["full"] }

View File

@ -0,0 +1,18 @@
[package]
name = "lib_rusty_torrent"
version = "0.1.0"
edition = "2021"
[lib]
name = "lib_rusty_torrent"
crate-type = ["lib"]
[dependencies]
tokio = { workspace = true }
serde = { version = "1.0.183", features = ["derive"] }
serde_bencode = "0.2.3"
serde_bytes = "0.11.12"
sha1 = "0.10.5"
dns-lookup = "2.0.2"
regex = "1.9.4"
reqwest = "0.11.20"

View File

@ -0,0 +1,77 @@
# Lib Rusty Torrent
A Bittorrent V1 Protocol a Rust library for handling torrent files, downloading files from torrents, and communicating with BitTorrent trackers.
## Features
- **Torrent Parsing:** Parse and deserialize torrent files into a structured representation.
- **BitTorrent Tracker Communication:** Communicate with BitTorrent trackers using the UDP protocol.
- **File Management:** Manage the creation and writing of files associated with a torrent download.
- **Peer Management:** Easy management of peers through abstracted methods
## Installation
To use this library in your Rust project, add the following line to your `Cargo.toml` file:
```toml
[dependencies]
your_library_name = "0.1.0"
```
## Usage
### Parsing a Torrent File
```rust
use your_library_name::torrent::Torrent;
#[tokio::main]
async fn main() {
let path = "path/to/your/file.torrent";
match Torrent::from_torrent_file(path).await {
Ok(torrent) => {
// Process the torrent metadata
println!("Torrent Info: {:?}", torrent);
}
Err(err) => {
eprintln!("Error parsing torrent file: {}", err);
}
}
}
```
### Downloading Files
```rust
use your_library_name::torrent::{Torrent, Tracker};
use your_library_name::file::Files;
#[tokio::main]
async fn main() {
// Load torrent information
let torrent_path = "path/to/your/file.torrent";
let torrent = Torrent::from_torrent_file(torrent_path).await.expect("Error parsing torrent file");
// Create a tracker for finding peers
let tracker_address = "tracker.example.com:1337".parse().expect("Error parsing tracker address");
let mut tracker = Tracker::new(tracker_address).await.expect("Error creating tracker");
// Find peers and start downloading
let peer_id = "your_peer_id";
let mut files = Files::new();
files.create_files(&torrent, "download_directory").await;
let peers = tracker.find_peers(&torrent, peer_id).await;
// Implement your download logic using the found peers
// ...
println!("Download completed!");
}
```
## Contribution
Contributions are welcome! Feel free to open issues or submit pull requests.
## License
This project is licensed under the MIT License - see the LICENSE file for details.

View File

@ -1,4 +1,3 @@
use log::debug;
use tokio::{ use tokio::{
fs::try_exists as dir_exists, fs::try_exists as dir_exists,
fs::create_dir as create_dir, fs::create_dir as create_dir,
@ -56,7 +55,6 @@ impl Files {
path.push_str(dir); path.push_str(dir);
if !dir_exists(&path).await.unwrap() { if !dir_exists(&path).await.unwrap() {
debug!("Creating: {path}");
create_dir(&path).await.unwrap(); create_dir(&path).await.unwrap();
} }
} }
@ -64,8 +62,6 @@ impl Files {
path.push('/'); path.push('/');
path.push_str(&t_file.path[t_file.path.len() - 1]); path.push_str(&t_file.path[t_file.path.len() - 1]);
debug!("Creating: {path}");
let file = File::create(&path).await.unwrap(); let file = File::create(&path).await.unwrap();
let length = t_file.length; let length = t_file.length;
@ -92,14 +88,12 @@ impl Files {
if file.current_length + piece_len > file.length { if file.current_length + piece_len > file.length {
let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap(); let n = file.file.write(&piece[j..(file.length - file.current_length) as usize]).await.unwrap();
debug!("Wrote {n}B > {}", file.name);
j = (file.length - file.current_length) as usize; j = (file.length - file.current_length) as usize;
file.current_length += j as u64; file.current_length += j as u64;
piece_len -= j as u64; piece_len -= j as u64;
file.complete = true; file.complete = true;
} else { } else {
let n = file.file.write(&piece[j..]).await.unwrap(); let n = file.file.write(&piece[j..]).await.unwrap();
debug!("Wrote {n}B > {}", file.name);
file.current_length += piece_len; file.current_length += piece_len;
return return
} }

View File

@ -0,0 +1,5 @@
pub mod torrent;
pub mod peer_wire_protocol;
pub mod peer;
pub mod files;
pub mod tracker;

View File

@ -2,18 +2,15 @@
// Crate Imports // Crate Imports
use crate::{ use crate::{
handshake::Handshake, peer_wire_protocol::{ Handshake, Message, MessageType },
message::{ FromBuffer, Message, MessageType, ToBuffer },
torrent::Torrent torrent::Torrent
}; };
// External imports // External imports
use log::{ debug, error, info }; use std::net::SocketAddrV4;
use std::{net::{SocketAddr, SocketAddrV4, Ipv4Addr}, sync::mpsc::Sender};
use tokio::{ use tokio::{
io::{ AsyncReadExt, AsyncWriteExt, Ready }, io::{ AsyncReadExt, AsyncWriteExt },
net::TcpStream, sync::{oneshot, broadcast}, spawn, net::TcpStream
sync::mpsc
}; };
/// Structure to abstract interaction with a peer. /// Structure to abstract interaction with a peer.
@ -21,11 +18,11 @@ pub struct Peer {
/// The `TcpStream` that is used to communicate with the peeer /// The `TcpStream` that is used to communicate with the peeer
connection_stream: TcpStream, connection_stream: TcpStream,
/// The `SocketAddr` of the peer /// The `SocketAddr` of the peer
socket_addr: SocketAddrV4, pub socket_addr: SocketAddrV4,
/// The id of the peer /// The id of the peer
pub peer_id: String, pub peer_id: String,
/// Whether the peer is choking the client /// Whether the peer is choking the client
choking: bool, pub choking: bool,
} }
impl Peer { impl Peer {
@ -34,19 +31,17 @@ impl Peer {
/// # Arguments /// # Arguments
/// ///
/// * `socket_address` - The socket address of the peer. /// * `socket_address` - The socket address of the peer.
pub async fn create_connection(socket_address: SocketAddrV4) -> Option<Self> { pub async fn create_connection(socket_address: SocketAddrV4) -> Result<Self, String> {
let connection_stream = match TcpStream::connect(socket_address).await { let connection_stream = match TcpStream::connect(socket_address).await {
Err(err) => { Err(err) => {
error!("unable to connect to {}, err: {}", socket_address, err); return Err(format!("unable to connect to {}, err: {}", socket_address, err))
return None
}, },
Ok(stream) => { Ok(stream) => {
debug!("created tcpstream successfully to: {socket_address}");
stream stream
} }
}; };
Some(Self { Ok(Self {
connection_stream, connection_stream,
socket_addr: socket_address, socket_addr: socket_address,
peer_id: String::new(), peer_id: String::new(),
@ -55,63 +50,16 @@ impl Peer {
} }
} }
#[derive(Clone, Debug)]
pub enum ControlMessage {
DownloadPiece(u32, u32, u32, u32),
DownloadedPiece(Vec<u8>)
}
impl Peer { impl Peer {
pub async fn test(address: SocketAddrV4, torrent: Torrent) -> (broadcast::Sender<ControlMessage>, broadcast::Receiver<ControlMessage>) {
let (sender, mut receiver) = broadcast::channel::<ControlMessage>(16);
let sx1 = sender.clone();
let rx1 = receiver.resubscribe();
let t = torrent.clone();
spawn(async move {
let mut peer = match Peer::create_connection(address).await {
None => { return },
Some(peer) => peer
};
peer.handshake(&torrent).await;
peer.keep_alive_until_unchoke().await;
info!("Successfully Created Connection with peer: {}", peer.peer_id);
loop {
if receiver.is_empty() {
continue
} else {
let Ok(m) = receiver.recv().await else {
continue;
};
println!("{m:#?}");
match m {
ControlMessage::DownloadPiece(a, b, mut c, d) => {
let buf = peer.request_piece(a, b, &mut c, d).await;
let _ = sender.send(ControlMessage::DownloadedPiece(buf));
}
_ => ()
}
}
}
});
(sx1, rx1)
}
/// Sends a handshake message to the peer, the first step in the peer wire messaging protocol. /// Sends a handshake message to the peer, the first step in the peer wire messaging protocol.
/// ///
/// # Arguments /// # Arguments
/// ///
/// * `torrent` - The `Torrent` instance associated with the peer. /// * `torrent` - The `Torrent` instance associated with the peer.
async fn handshake(&mut self, torrent: &Torrent) { pub async fn handshake(&mut self, torrent: &Torrent) -> Result<(), String>{
let mut buf = vec![0; 1024]; let mut buf = vec![0; 1024];
let handshake_message = Handshake::new(&torrent.get_info_hash()).unwrap(); let handshake_message = Handshake::new(&torrent.get_info_hash(), String::from("-RT0001-123456012345")).unwrap();
self.connection_stream.writable().await.unwrap(); self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap(); self.connection_stream.write_all(&handshake_message.to_buffer()).await.unwrap();
@ -120,10 +68,9 @@ impl Peer {
let _ = self.connection_stream.read(&mut buf).await.unwrap(); let _ = self.connection_stream.read(&mut buf).await.unwrap();
let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap(); let handshake = Handshake::from_buffer(&buf[..68].to_vec()).unwrap();
handshake.log_useful_information();
for message_buf in Message::number_of_messages(&buf[68..]).0 { for message_buf in Message::number_of_messages(&buf[68..]).0 {
let message = Message::from_buffer(&message_buf); let message: Message = (&*message_buf).try_into()?;
if message.message_type == MessageType::Unchoke { if message.message_type == MessageType::Unchoke {
self.choking = false; self.choking = false;
@ -131,22 +78,23 @@ impl Peer {
} }
self.peer_id = handshake.peer_id; self.peer_id = handshake.peer_id;
Ok(())
} }
/// Keeps the connection alive and sends interested messages until the peer unchokes /// Keeps the connection alive and sends interested messages until the peer unchokes
async fn keep_alive_until_unchoke(&mut self) { pub async fn keep_alive_until_unchoke(&mut self) -> Result<(), String> {
loop { loop {
let message = self.read_message().await; let message = self.read_message().await?;
debug!("{message:?}");
match message.message_type { match message.message_type {
MessageType::Unchoke => { MessageType::Unchoke => {
self.choking = false; self.choking = false;
break break
} }
MessageType::KeepAlive => { MessageType::KeepAlive => {
self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await; self.send_message_no_response(Message::new(0, MessageType::KeepAlive, None)).await?;
self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await; self.send_message_no_response(Message::new(1, MessageType::Interested, None)).await?;
} }
MessageType::Choke => { MessageType::Choke => {
self.choking = true; self.choking = true;
@ -154,59 +102,68 @@ impl Peer {
_ => { continue } _ => { continue }
} }
} }
Ok(())
} }
/// Sends a message to the peer and waits for a response, which it returns /// Sends a message to the peer and waits for a response, which it returns
async fn send_message(&mut self, message: Message) -> Message { pub async fn send_message(&mut self, message: Message) -> Result<Message, String> {
let mut buf = vec![0; 16_397]; let mut response = vec![0; 16_397];
let message: Vec<u8> = message.try_into()?;
self.connection_stream.writable().await.unwrap(); self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); self.connection_stream.write_all(&message).await.unwrap();
self.connection_stream.readable().await.unwrap(); self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); let _ = self.connection_stream.read_exact(&mut response).await.unwrap();
Message::from_buffer(&buf) Ok((*response).try_into()?)
} }
/// Sends a message to the peer and waits for a response, which it returns /// Sends a message to the peer and waits for a response, which it returns
async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Message { pub async fn send_message_exact_size_response(&mut self, message: Message, size: usize) -> Result<Message, String> {
let mut buf = vec![0; size]; let mut response = vec![0; size];
let message: Vec<u8> = message.try_into()?;
self.connection_stream.writable().await.unwrap(); self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); self.connection_stream.write_all(&message).await.unwrap();
self.connection_stream.readable().await.unwrap(); self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read_exact(&mut buf).await.unwrap(); let _ = self.connection_stream.read_exact(&mut response).await.unwrap();
Message::from_buffer(&buf) Ok((*response).try_into()?)
} }
/// Sends a message but doesn't wait for a response /// Sends a message but doesn't wait for a response
async fn send_message_no_response(&mut self, message: Message) { pub async fn send_message_no_response(&mut self, message: Message) -> Result<(), String> {
let message: Vec<u8> = message.try_into()?;
self.connection_stream.writable().await.unwrap(); self.connection_stream.writable().await.unwrap();
self.connection_stream.write_all(&message.to_buffer()).await.unwrap(); self.connection_stream.write_all(&message).await.unwrap();
Ok(())
} }
/// reads a message from the peer /// reads a message from the peer
async fn read_message(&mut self) -> Message { pub async fn read_message(&mut self) -> Result<Message, String> {
let mut buf = vec![0; 16_397]; let mut response = vec![0; 16_397];
self.connection_stream.readable().await.unwrap(); self.connection_stream.readable().await.unwrap();
let _ = self.connection_stream.read(&mut buf).await.unwrap(); let _ = self.connection_stream.read(&mut response).await.unwrap();
Message::from_buffer(&buf) Ok((*response).try_into()?)
} }
/// Shutsdown the connection stream /// Shutsdown the connection stream
async fn disconnect(&mut self) { pub async fn disconnect(&mut self) -> Result<(), String>{
match self.connection_stream.shutdown().await { match self.connection_stream.shutdown().await {
Err(err) => { Err(err) => {
error!("Error disconnecting from {}: {}", self.socket_addr, err); return Err(format!("Error disconnecting from {}: {}", self.socket_addr, err));
panic!("Error disconnecting from {}: {}", self.socket_addr, err);
}, },
Ok(_) => { Ok(_) => {
debug!("Successfully disconnected from {}", self.socket_addr) Ok(())
} }
} }
} }
@ -214,7 +171,7 @@ impl Peer {
impl Peer { impl Peer {
// Sends the requests and reads responses to put a piece together // Sends the requests and reads responses to put a piece together
pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Vec<u8> { pub async fn request_piece(&mut self, index: u32, piece_length: u32, len: &mut u32, total_len: u32) -> Result<Vec<u8>, String> {
let mut buf = vec![]; let mut buf = vec![];
// Sequentially requests piece from the peer // Sequentially requests piece from the peer
for offset in (0..piece_length).step_by(16_384) { for offset in (0..piece_length).step_by(16_384) {
@ -223,15 +180,14 @@ impl Peer {
let response: Message; let response: Message;
if *len + 16_384 >= total_len { if *len + 16_384 >= total_len {
debug!("Final Request {}", total_len - *len);
length = total_len - *len; length = total_len - *len;
response = self.send_message_exact_size_response( response = self.send_message_exact_size_response(
Message::create_request(index, offset, length), Message::create_piece_request(index, offset, length),
length as usize + 13 length as usize + 13
).await; ).await?;
} else { } else {
response = self.send_message(Message::create_request(index, offset, length)).await; response = self.send_message(Message::create_piece_request(index, offset, length)).await?;
}; };
match response.message_type { match response.message_type {
@ -244,14 +200,46 @@ impl Peer {
buf.push(byte) buf.push(byte)
} }
}, },
_ => { debug!("didn't recieve expected piece request | Recieved: {:?}", response.message_type); } _ => { }
}; };
if *len >= total_len - 1 { if *len >= total_len - 1 {
return buf; return Ok(buf);
} }
} }
buf Ok(buf)
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::torrent::Torrent;
use std::net::Ipv4Addr;
#[tokio::test]
async fn peer_create_connection() {
// Replace the IP and port with the actual values
let socket_address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 6881);
match Peer::create_connection(socket_address).await {
Ok(peer) => {
assert_eq!(peer.socket_addr, socket_address);
// Add more assertions if needed
}
Err(err) => panic!("Unexpected error: {}", err),
}
}
#[tokio::test]
async fn peer_handshake() {
let socket_address = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 6881);
let mut peer = Peer::create_connection(socket_address.clone()).await.unwrap();
let torrent = Torrent::from_torrent_file("test.torrent").await.unwrap();
assert!(peer.handshake(&torrent).await.is_ok());
}
// Add more tests for other methods in the Peer structure
}

View File

@ -0,0 +1,554 @@
/// Represents the handshake message that will be sent to a client.
#[derive(Debug)]
pub struct Handshake {
/// The length of the protocol name, must be 19 for "BitTorrent protocol".
p_str_len: u8,
/// The protocol name, should always be "BitTorrent protocol".
p_str: String,
/// Reserved for extensions, currently unused.
reserved: [u8; 8],
/// The infohash for the torrent.
info_hash: Vec<u8>,
/// The identifier for the client.
pub peer_id: String,
}
impl Handshake {
/// Creates a new handshake.
///
/// # Arguments
///
/// * `info_hash` - The infohash for the torrent.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
pub fn new(info_hash: &[u8], peer_id: String) -> Result<Self, String> {
if info_hash.len() != 20 {
return Err(String::from("Incorrect infohash length"));
}
if peer_id.len() != 20 {
return Err(String::from("Incorrect Peer_Id Length"))
}
Ok(Self {
p_str_len: 19,
p_str: String::from("BitTorrent protocol"),
reserved: [0; 8],
info_hash: info_hash.to_vec(),
peer_id: String::from("-MY0001-123456654321")
})
}
/// Converts the `Handshake` instance to a byte buffer for sending to a peer.
///
/// # Returns
///
/// A byte vector containing the serialized handshake.
pub fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![0; 68];
buf[0] = self.p_str_len;
buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]);
buf[21..28].copy_from_slice(&self.reserved[..7]);
buf[28..48].copy_from_slice(&self.info_hash[..20]);
buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]);
buf
}
/// Converts a byte buffer to a `Handshake` instance.
///
/// # Arguments
///
/// * `buf` - A byte vector containing the serialized handshake.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
///
/// # Errors
///
/// Returns an error if the provided buffer is not long enough (at least 68 bytes).
pub fn from_buffer(buf: &Vec<u8>) -> Result<Self, String> {
// Verify that buffer is at least the correct size, if not error
if buf.len() < 68 {
return Err(String::from("buffer provided to handshake was too short"));
}
let mut p_str = String::new();
for byte in buf.iter().take(20).skip(1) {
p_str.push(*byte as char)
}
let mut info_hash: Vec<u8> = vec![0; 20];
info_hash[..20].copy_from_slice(&buf[28..48]);
let mut peer_id = String::new();
for byte in buf.iter().take(68).skip(48) {
peer_id.push(*byte as char)
}
Ok(Self {
p_str_len: buf[0],
p_str,
reserved: [0; 8],
info_hash,
peer_id
})
}
}
/// Represents a message in the BitTorrent protocol.
#[derive(Clone, Debug, PartialEq)]
pub struct Message {
/// The length of the message, including the type and payload.
pub message_length: u32,
/// The type of message.
pub message_type: MessageType,
/// The payload of the message, if any.
pub payload: Option<Vec<u8>>,
}
impl Message {
/// Creates a new message.
///
/// # Arguments
///
/// * `message_length` - The length of the message.
/// * `message_type` - The type of message.
/// * `payload` - The payload of the message, if any.
pub fn new(message_length: u32, message_type: MessageType, payload: Option<Vec<u8>>) -> Self {
Self { message_length, message_type, payload }
}
}
impl TryFrom<&[u8]> for Message {
type Error = String;
/// Decodes a message from a given buffer.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing the serialized message.
///
/// # Returns
///
/// A new `Message` instance on success, or an empty `Result` indicating an error.
fn try_from(value: &[u8]) -> Result<Self, Self::Error> {
let mut message_length: [u8; 4] = [0; 4];
if value.len() < 5 {
return Err(format!("Buffer not long enough to be a message: Length {}, should be at least 4 bytes", value.len()));
}
message_length[..4].copy_from_slice(&value[..4]);
let message_length = u32::from_be_bytes(message_length);
let payload: Option<Vec<u8>>;
let message_type: MessageType;
if message_length == 0 {
message_type = MessageType::KeepAlive;
payload = None;
} else if message_length == 5 {
message_type = value[4].try_into()?;
payload = None;
} else {
message_type = value[4].try_into()?;
let end_of_message = 4 + message_length as usize;
if end_of_message > value.len() {
return Err(format!("Invalid message length {} expected {}", value.len(), end_of_message))
} else {
payload = Some(value[5..end_of_message].to_vec());
}
}
Ok(Self {
message_length,
message_type,
payload
})
}
}
impl TryFrom<Message> for Vec<u8> {
type Error = String;
/// Converts the `Message` instance to a byte buffer for sending.
///
/// # Returns
///
/// A byte vector containing the serialized message.
fn try_from(value: Message) -> Result<Self, Self::Error> {
let mut buf: Vec<u8> = vec![];
for byte in value.message_length.to_be_bytes() {
buf.push(byte);
}
match value.message_type {
MessageType::KeepAlive => {
return Ok(buf)
},
MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => {
buf.push(value.message_type.try_into()?);
return Ok(buf);
},
MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => {
buf.push(value.message_type.try_into()?);
},
}
match value.payload {
None => {
return Err(String::from("Error you are trying to create a message that needs a payload with no payload"))
}
Some(payload) => {
buf.extend(payload);
}
}
Ok(buf)
}
}
impl Message {
/// Create a request message from a given piece_index, offset, and length
///
/// # Arguments
///
/// * `piece_index` - The index of the piece in the torrent
/// * `offset` - The offset within the piece, because requests should be no more than 16KiB
/// * `length` - The length of the piece request, should be 16KiB
///
/// # Returns
///
/// A piece request message
pub fn create_piece_request(piece_index: u32, offset: u32, length: u32) -> Self {
let mut payload: Vec<u8> = vec![];
for byte in piece_index.to_be_bytes() {
payload.push(byte);
}
for byte in offset.to_be_bytes() {
payload.push(byte)
}
for byte in length.to_be_bytes() {
payload.push(byte)
}
Self {
message_length: 13,
message_type: MessageType::Request,
payload: Some(payload)
}
}
/// Returns the number of messages in the given buffer and their contents.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing multiple serialized messages.
///
/// # Returns
///
/// A tuple containing a vector of message byte buffers and the number of messages.
pub fn number_of_messages(buf: &[u8]) -> (Vec<Vec<u8>>, u32) {
let mut message_num = 0;
let mut messages: Vec<Vec<u8>> = vec![];
// Find the length of message one
// put that into an array and increment counter by one
let mut i = 0; // points to the front
let mut j; // points to the back
loop {
j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4;
messages.push(buf[i..i+j].to_vec());
i += j;
message_num += 1;
if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 {
break;
}
}
(messages, message_num)
}
}
/// An enum representing all possible message types in the BitTorrent peer wire protocol.
#[derive(Clone, Debug, PartialEq)]
#[repr(u8)]
pub enum MessageType {
/// Keepalive message, 0 length.
/// Potential Errors if trying to handle a keepalive message like another message.
/// Due to length being 0, should always be explicitly handled.
KeepAlive = u8::MAX,
/// Message telling the client not to send any requests until the peer has unchoked, 1 length.
Choke = 0,
/// Message telling the client that it can send requests, 1 length.
Unchoke = 1,
/// Message indicating that the peer is still interested in downloading, 1 length.
Interested = 2,
/// Message indicating that the peer is not interested in downloading, 1 length.
NotInterested = 3,
/// Message indicating that the peer has a given piece, fixed length.
Have = 4,
/// Message sent after a handshake, represents the pieces that the peer has.
Bitfield = 5,
/// Request a given part of a piece based on index, offset, and length, 13 length.
Request = 6,
/// A response to a request with the accompanying data, varying length.
Piece = 7,
/// Cancels a request, 13 length.
Cancel = 8,
/// Placeholder for unimplemented message type.
Port = 9,
}
impl TryFrom<MessageType> for u8 {
type Error = String;
fn try_from(value: MessageType) -> Result<Self, Self::Error> {
match value {
MessageType::Choke => Ok(0),
MessageType::Unchoke => Ok(1),
MessageType::Interested => Ok(2),
MessageType::NotInterested => Ok(3),
MessageType::Have => Ok(4),
MessageType::Bitfield => Ok(5),
MessageType::Request => Ok(6),
MessageType::Piece => Ok(7),
MessageType::Cancel => Ok(8),
MessageType::Port => Ok(9),
_ => {
Err(format!("Invalid Message Type {:?}", value))
}
}
}
}
impl TryFrom<u8> for MessageType {
type Error = String;
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(MessageType::Choke),
1 => Ok(MessageType::Unchoke),
2 => Ok(MessageType::Interested),
3 => Ok(MessageType::NotInterested),
4 => Ok(MessageType::Have),
5 => Ok(MessageType::Bitfield),
6 => Ok(MessageType::Request),
7 => Ok(MessageType::Piece),
8 => Ok(MessageType::Cancel),
9 => Ok(MessageType::Port),
_ => {
Err(format!("Invalid Message Type {}", value))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handshake_creation() {
let info_hash: [u8; 20] = [1; 20];
let peer_id = String::from("-MY0001-123456654321");
match Handshake::new(&info_hash, peer_id.clone()) {
Ok(handshake) => {
assert_eq!(handshake.p_str_len, 19);
assert_eq!(handshake.p_str, "BitTorrent protocol");
assert_eq!(handshake.reserved, [0; 8]);
assert_eq!(handshake.info_hash, info_hash.to_vec());
assert_eq!(handshake.peer_id, peer_id);
}
Err(_) => panic!("Unexpected error creating handshake"),
}
}
#[test]
fn handshake_creation_invalid_infohash() {
let invalid_info_hash: [u8; 19] = [1; 19];
let peer_id = String::from("-MY0001-123456654321");
match Handshake::new(&invalid_info_hash, peer_id.clone()) {
Err(err) => assert_eq!(err, "Incorrect infohash length"),
Ok(_) => panic!("Expected an error creating handshake, but got Ok"),
}
}
#[test]
fn handshake_creation_invalid_peer_id() {
let info_hash: [u8; 20] = [1; 20];
let invalid_peer_id = String::from("-INVALID");
match Handshake::new(&info_hash, invalid_peer_id) {
Err(err) => assert_eq!(err, "Incorrect Peer_Id Length"),
Ok(_) => panic!("Expected an error creating handshake, but got Ok"),
}
}
#[test]
fn handshake_to_buffer() {
let info_hash: [u8; 20] = [1; 20];
let peer_id = String::from("-MY0001-123456654321");
let handshake = Handshake::new(&info_hash, peer_id).unwrap();
let buffer = handshake.to_buffer();
assert_eq!(buffer.len(), 68);
// Add more assertions based on the expected structure of the buffer if needed
}
#[test]
fn handshake_from_buffer() {
let info_hash: [u8; 20] = [1; 20];
let peer_id = String::from("-MY0001-123456654321");
let original_handshake = Handshake::new(&info_hash, peer_id.clone()).unwrap();
let buffer = original_handshake.to_buffer();
match Handshake::from_buffer(&buffer) {
Ok(handshake) => assert_eq!(handshake.peer_id, peer_id),
Err(err) => panic!("Unexpected error: {}", err),
}
}
#[test]
fn handshake_from_buffer_invalid_size() {
let short_buffer: Vec<u8> = vec![0; 67]; // Invalid size
match Handshake::from_buffer(&short_buffer) {
Err(err) => assert_eq!(err, "buffer provided to handshake was too short"),
Ok(_) => panic!("Expected an error, but got Ok"),
}
}
#[test]
fn u8_to_message_type() {
assert_eq!(TryInto::<MessageType>::try_into(0 as u8), Ok(MessageType::Choke));
assert_eq!(TryInto::<MessageType>::try_into(1 as u8), Ok(MessageType::Unchoke));
assert_eq!(TryInto::<MessageType>::try_into(2 as u8), Ok(MessageType::Interested));
assert_eq!(TryInto::<MessageType>::try_into(3 as u8), Ok(MessageType::NotInterested));
assert_eq!(TryInto::<MessageType>::try_into(4 as u8), Ok(MessageType::Have));
assert_eq!(TryInto::<MessageType>::try_into(5 as u8), Ok(MessageType::Bitfield));
assert_eq!(TryInto::<MessageType>::try_into(6 as u8), Ok(MessageType::Request));
assert_eq!(TryInto::<MessageType>::try_into(7 as u8), Ok(MessageType::Piece));
assert_eq!(TryInto::<MessageType>::try_into(8 as u8), Ok(MessageType::Cancel));
assert_eq!(TryInto::<MessageType>::try_into(9 as u8), Ok(MessageType::Port));
assert_eq!(TryInto::<MessageType>::try_into(10 as u8), Err(String::from("Invalid Message Type 10")));
}
#[test]
fn message_type_to_u8() {
assert_eq!(TryInto::<u8>::try_into(MessageType::Choke), Ok(0 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Unchoke), Ok(1 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Interested), Ok(2 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::NotInterested), Ok(3 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Have), Ok(4 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Bitfield), Ok(5 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Request), Ok(6 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Piece), Ok(7 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Cancel), Ok(8 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::Port), Ok(9 as u8));
assert_eq!(TryInto::<u8>::try_into(MessageType::KeepAlive), Err(String::from("Invalid Message Type KeepAlive")));
}
#[test]
fn create_piece_request() {
let piece_index = 42;
let offset = 1024;
let length = 16384;
let request_message = Message::create_piece_request(piece_index, offset, length);
assert_eq!(request_message.message_length, 13);
assert_eq!(request_message.message_type, MessageType::Request);
if let Some(payload) = request_message.payload {
assert_eq!(payload.len(), 12); // 4 bytes for piece_index + 4 bytes for offset + 4 bytes for length
let mut expected_payload = vec![];
expected_payload.extend_from_slice(&piece_index.to_be_bytes());
expected_payload.extend_from_slice(&offset.to_be_bytes());
expected_payload.extend_from_slice(&length.to_be_bytes());
assert_eq!(payload, expected_payload);
} else {
panic!("Expected payload, but found None");
}
}
#[test]
fn try_from_valid_message() {
let message_bytes = vec![0, 0, 0, 5, 1]; // Unchoke message
match Message::try_from(&message_bytes[..]) {
Ok(message) => {
assert_eq!(message.message_length, 5);
assert_eq!(message.message_type, MessageType::Unchoke);
assert!(message.payload.is_none());
}
Err(err) => panic!("Unexpected error: {}", err),
}
}
#[test]
fn try_from_invalid_message() {
let invalid_message_bytes = vec![0, 0, 0, 2]; // Message length indicates 2 bytes, but no payload provided
match Message::try_from(&invalid_message_bytes[..]) {
Ok(_) => panic!("Expected an error but got Ok"),
Err(err) => {
assert_eq!(
err,
"Buffer not long enough to be a message: Length 4, should be at least 4 bytes"
);
}
}
}
#[test]
fn try_into_valid_message() {
let message = Message {
message_length: 5,
message_type: MessageType::Unchoke,
payload: None,
};
match Vec::<u8>::try_from(message) {
Ok(serialized_message) => {
assert_eq!(serialized_message, vec![0, 0, 0, 5, 1]); // Unchoke message
}
Err(err) => panic!("Unexpected error: {}", err),
}
}
#[test]
fn try_into_message_with_payload() {
let payload_data = vec![65, 66, 67]; // Arbitrary payload
let message = Message {
message_length: 7,
message_type: MessageType::Piece,
payload: Some(payload_data.clone()),
};
match Vec::<u8>::try_from(message) {
Ok(serialized_message) => {
let mut expected_serialized_message = vec![0, 0, 0, 7, 7]; // Piece message
expected_serialized_message.extend_from_slice(&payload_data);
assert_eq!(serialized_message, expected_serialized_message);
}
Err(err) => panic!("Unexpected error: {}", err),
}
}
}

View File

@ -0,0 +1,394 @@
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use tokio::{fs::File as TokioFile, io::AsyncReadExt};
use std::net::{IpAddr, SocketAddrV4};
/// Represents a node in a DHT network.
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Node(String, i64);
/// Represents a file described in a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct File {
pub path: Vec<String>,
pub length: u64,
#[serde(default)]
md5sum: Option<String>,
}
/// Represents the metadata of a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Info {
pub name: String,
#[serde(with = "serde_bytes")]
pub pieces: Vec<u8>,
#[serde(rename = "piece length")]
pub piece_length: u64,
#[serde(default)]
md5sum: Option<String>,
#[serde(default)]
pub length: Option<i64>,
#[serde(default)]
pub files: Option<Vec<File>>,
#[serde(default)]
private: Option<u8>,
#[serde(default)]
path: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "root hash")]
root_hash: Option<String>,
}
/// Represents a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Torrent {
pub info: Info,
#[serde(default)]
pub announce: Option<String>,
#[serde(default)]
nodes: Option<Vec<Node>>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
httpseeds: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "announce-list")]
announce_list: Option<Vec<Vec<String>>>,
#[serde(default)]
#[serde(rename = "creation date")]
creation_date: Option<i64>,
#[serde(rename = "comment")]
comment: Option<String>,
#[serde(default)]
#[serde(rename = "created by")]
created_by: Option<String>
}
impl Torrent {
/// Reads a `.torrent` file and converts it into a `Torrent` struct.
///
/// # Arguments
///
/// * `path` - The path to the `.torrent` file.
pub async fn from_torrent_file(path: &str) -> Result<Self, String> {
let Ok(mut file) = TokioFile::open(path).await else {
return Err(format!("Unable to read file at {path}"));
};
let mut buf: Vec<u8> = Vec::new();
let Ok(_) = file.read_to_end(&mut buf).await else {
return Err(format!("Error reading file > {path}"));
};
let torrent: Torrent = match serde_bencode::from_bytes(&buf) {
Err(_) => return Err(format!("Error deserializing file > {path}")),
Ok(torrent) => torrent,
};
Ok(torrent)
}
}
impl Torrent {
/// Calculates the info hash of the torrent.
pub fn get_info_hash(&self) -> Vec<u8> {
let buf = serde_bencode::to_bytes(&self.info).unwrap();
let mut hasher = Sha1::new();
hasher.update(buf);
let res = hasher.finalize();
res[..].to_vec()
}
/// Checks if a downloaded piece matches its hash.
///
/// # Arguments
///
/// * `piece` - The downloaded piece.
/// * `index` - The index of the piece.
///
/// # Returns
///
/// * `true` if the piece is correct, `false` otherwise.
pub fn check_piece(&self, piece: &[u8], index: u32) -> bool {
let mut hasher = Sha1::new();
hasher.update(piece);
let result = hasher.finalize();
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash {
true
} else {
false
}
}
pub fn get_total_length(&self) -> u64 {
if let Some(n) = self.info.length {
return n as u64
};
if let Some(files) = &self.info.files {
let mut n = 0;
for file in files {
n += file.length;
};
return n
};
0
}
pub fn get_trackers(&self) -> Result<Vec<SocketAddrV4>, String> {
let mut addresses = vec![];
// This is the current regex as I haven't implemented support for http trackers yet
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap();
if let Some(url) = &self.announce {
if let Some(captures) = re.captures(url) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()))
}
}
}
}
}
if let Some(urls) = &self.announce_list {
for url in urls.iter() {
if let Some(captures) = re.captures(&url[0]) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()));
}
}
}
}
}
}
if addresses.len() > 0 {
Ok(addresses)
} else {
Err(String::from("Unable to find trackers"))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::runtime::Runtime;
#[test]
fn from_torrent_file_success() {
let runtime = Runtime::new().unwrap();
let path = "test.torrent";
let result = runtime.block_on(Torrent::from_torrent_file(path));
println!("{result:?}");
assert!(result.is_ok());
}
#[test]
fn from_torrent_file_failure() {
let runtime = Runtime::new().unwrap();
let path = "nonexistent/file.torrent";
let result = runtime.block_on(Torrent::from_torrent_file(path));
assert!(result.is_err());
}
#[test]
fn get_info_hash() {
// Create a mock Torrent instance
let torrent = Torrent {
info: Info {
name: String::from("test_torrent"),
pieces: vec![],
piece_length: 1024,
length: Some(2048),
files: None,
md5sum: None,
private: None,
path: None,
root_hash: None,
},
announce: Some(String::from("http://tracker.example.com/announce")),
nodes: None,
encoding: None,
httpseeds: None,
announce_list: None,
creation_date: None,
comment: None,
created_by: None,
};
let result = torrent.get_info_hash();
assert!(!result.is_empty());
}
#[test]
fn check_piece_valid() {
let mut hasher = Sha1::new();
hasher.update(vec![0; 1024]);
let piece_hash: &[u8] = &hasher.finalize();
// Create a mock Torrent instance
let torrent = Torrent {
info: Info {
name: String::from("test_torrent"),
pieces: piece_hash.into(), // Mock piece hashes
piece_length: 1024,
length: Some(2048),
files: None,
md5sum: None,
private: None,
path: None,
root_hash: None,
},
announce: Some(String::from("http://tracker.example.com/announce")),
nodes: None,
encoding: None,
httpseeds: None,
announce_list: None,
creation_date: None,
comment: None,
created_by: None,
};
// Mock a valid piece
let piece = vec![0; 1024];
let result = torrent.check_piece(&piece, 0);
assert!(result);
}
#[test]
fn check_piece_invalid() {
// Create a mock Torrent instance
let torrent = Torrent {
info: Info {
name: String::from("test_torrent"),
pieces: vec![0; 20], // Mock piece hashes
piece_length: 1024,
length: Some(2048),
files: None,
md5sum: None,
private: None,
path: None,
root_hash: None,
},
announce: Some(String::from("http://tracker.example.com/announce")),
nodes: None,
encoding: None,
httpseeds: None,
announce_list: None,
creation_date: None,
comment: None,
created_by: None,
};
// Mock an invalid piece
let piece = vec![1; 1024];
let result = torrent.check_piece(&piece, 0);
assert!(!result);
}
#[test]
fn get_total_length_single_file() {
// Create a mock Torrent instance with a single file
let torrent = Torrent {
info: Info {
name: String::from("test_torrent"),
pieces: vec![],
piece_length: 1024,
length: Some(2048),
files: Some(vec![File {
path: vec![String::from("test_file.txt")],
length: 2048,
md5sum: None,
}]),
md5sum: None,
private: None,
path: None,
root_hash: None,
},
announce: Some(String::from("http://tracker.example.com/announce")),
nodes: None,
encoding: None,
httpseeds: None,
announce_list: None,
creation_date: None,
comment: None,
created_by: None,
};
let result = torrent.get_total_length();
assert_eq!(result, 2048);
}
#[test]
fn get_total_length_multiple_files() {
// Create a mock Torrent instance with multiple files
let torrent = Torrent {
info: Info {
name: String::from("test_torrent"),
pieces: vec![],
piece_length: 1024,
length: None,
files: Some(vec![
File {
path: vec![String::from("file1.txt")],
length: 1024,
md5sum: None,
},
File {
path: vec![String::from("file2.txt")],
length: 2048,
md5sum: None,
},
]),
md5sum: None,
private: None,
path: None,
root_hash: None,
},
announce: Some(String::from("http://tracker.example.com/announce")),
nodes: None,
encoding: None,
httpseeds: None,
announce_list: None,
creation_date: None,
comment: None,
created_by: None,
};
let result = torrent.get_total_length();
assert_eq!(result, 3072);
}
// Add more tests for other methods and edge cases as needed
}

View File

@ -1,7 +1,6 @@
use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4}; use std::net::{SocketAddr, Ipv4Addr, SocketAddrV4};
use tokio::net::UdpSocket; use tokio::net::UdpSocket;
use log::{debug, error};
use crate::torrent::Torrent; use crate::torrent::Torrent;
@ -26,22 +25,16 @@ impl Tracker {
/// # Panics /// # Panics
/// ///
/// Panics if there is an error parsing the given address or creating the UDP socket. /// Panics if there is an error parsing the given address or creating the UDP socket.
pub async fn new(listen_address: SocketAddr, remote_address: SocketAddr) -> Result<Self, ()> { pub async fn new(listen_address: SocketAddr, remote_address: SocketAddr) -> Result<Self, String> {
let Ok(connection_stream) = UdpSocket::bind(listen_address).await else { let Ok(connection_stream) = UdpSocket::bind(listen_address).await else {
error!("error binding to udpsocket {listen_address}"); return Err(format!("error binding to udpsocket {listen_address}"))
return Err(())
}; };
debug!("bound udpsocket successfully to: {listen_address}");
match connection_stream.connect(remote_address).await { match connection_stream.connect(remote_address).await {
Err(err) => { Err(err) => {
error!("unable to connect to {}, err: {}", remote_address, err); return Err(format!("error creating udpsocket, {}", err));
panic!("error creating udpsocket, {}", err);
}, },
Ok(()) => { Ok(()) => { }
debug!("successfully connected to: {remote_address}");
}
}; };

Binary file not shown.

8
rusty_torrenter/.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
/target
Cargo.lock
/log/rusty_torrent.log
/testing
/process
/log
.DS_Store
/downloads

View File

@ -0,0 +1,29 @@
[package]
name = "rusty_torrenter"
version = "0.9.3"
edition = "2021"
description = "A BitTorrent client implemented in Rust that allows you to interact with the BitTorrent protocol and download torrents."
authors = ["Arlo Filley <filleyarlo@gmail.com>"]
exclude = ["testing/", "process/", ".vscode/", ".DS_STORE"]
license = "MIT"
keywords = ["bittorrent", "torrent", "torrentclient"]
readme = "README.md"
repository = "https://github.com/arlofilley/rusty_torrent"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lib_rusty_torrent = { path = "../lib_rusty_torrent" }
dns-lookup = "2.0.2"
log = "0.4.20"
regex = "1.9.4"
reqwest = "0.11.20"
serde = { version = "1.0.183", features = ["derive"] }
serde_bencode = "0.2.3"
serde_bytes = "0.11.12"
sha1 = "0.10.5"
simple-logging = "2.0.2"
tokio = { workspace = true }
clap = { version = "*", features = ["derive"] }

View File

@ -9,30 +9,20 @@
//! Checks piece hashes //! Checks piece hashes
//! Writes to torrent file //! Writes to torrent file
// Modules
mod files;
mod handshake;
mod peer;
mod message;
mod torrent;
mod tracker;
use core::panic; use core::panic;
use std::net::SocketAddr; use std::net::SocketAddr;
// Crate Imports // Crate Imports
use crate::{ use lib_rusty_torrent::{
files::Files, files::Files,
peer::Peer, peer::*,
torrent::Torrent, torrent::Torrent,
tracker::tracker::Tracker tracker::Tracker,
}; };
use tokio::sync::mpsc;
// External Ipmorts // External Ipmorts
use clap::Parser; use clap::Parser;
use log::{ debug, info, LevelFilter, error }; use log::{ debug, info, LevelFilter, error };
use tokio::spawn;
/// Struct Respresenting needed arguments /// Struct Respresenting needed arguments
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -64,8 +54,7 @@ async fn main() {
info!("==> WELCOME TO RUSTY-TORRENT <=="); info!("==> WELCOME TO RUSTY-TORRENT <==");
// Read the Torrent File // Read the Torrent File
let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await; let torrent = Torrent::from_torrent_file(&args.torrent_file_path).await.unwrap();
torrent.log_useful_information();
// Create the files that will be written to // Create the files that will be written to
let mut files = Files::new(); let mut files = Files::new();
@ -102,11 +91,9 @@ async fn main() {
let mut i = 0; let mut i = 0;
let t = torrent.clone(); let t = torrent.clone();
let (sender, mut reciever) = Peer::test(peers[0], torrent).await;
loop {
let _ = sender.send(peer::ControlMessage::DownloadPiece(i, t.info.piece_length as u32, len, t.get_total_length() as u32)); let _ = sender.send(peer::ControlMessage::DownloadPiece(i, t.info.piece_length as u32, len, t.get_total_length() as u32));
reciever.resubscribe();
let a = reciever.recv().await.unwrap(); let a = reciever.recv().await.unwrap();
@ -121,9 +108,8 @@ async fn main() {
} else { } else {
break break
} }
}
//peer.disconnect().await; peer.disconnect().await;
info!("Successfully completed download"); info!("Successfully completed download");

View File

@ -1,105 +0,0 @@
use log::{ info, error };
/// Represents the handshake message that will be sent to a client.
#[derive(Debug)]
pub struct Handshake {
/// The length of the protocol name, must be 19 for "BitTorrent protocol".
p_str_len: u8,
/// The protocol name, should always be "BitTorrent protocol".
p_str: String,
/// Reserved for extensions, currently unused.
reserved: [u8; 8],
/// The infohash for the torrent.
info_hash: Vec<u8>,
/// The identifier for the client.
pub peer_id: String,
}
impl Handshake {
/// Creates a new handshake.
///
/// # Arguments
///
/// * `info_hash` - The infohash for the torrent.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
pub fn new(info_hash: &[u8]) -> Option<Self> {
if info_hash.len() != 20 {
error!("Incorrect infohash length, consider using the helper function in torrent");
return None;
}
Some(Self {
p_str_len: 19,
p_str: String::from("BitTorrent protocol"),
reserved: [0; 8],
info_hash: info_hash.to_vec(),
peer_id: String::from("-MY0001-123456654322")
})
}
/// Converts the `Handshake` instance to a byte buffer for sending to a peer.
///
/// # Returns
///
/// A byte vector containing the serialized handshake.
pub fn to_buffer(&self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![0; 68];
buf[0] = self.p_str_len;
buf[1..20].copy_from_slice(&self.p_str.as_bytes()[..19]);
buf[21..28].copy_from_slice(&self.reserved[..7]);
buf[28..48].copy_from_slice(&self.info_hash[..20]);
buf[48..68].copy_from_slice(&self.peer_id.as_bytes()[..20]);
buf
}
/// Converts a byte buffer to a `Handshake` instance.
///
/// # Arguments
///
/// * `buf` - A byte vector containing the serialized handshake.
///
/// # Returns
///
/// A new `Handshake` instance on success, or an empty `Result` indicating an error.
///
/// # Errors
///
/// Returns an error if the provided buffer is not long enough (at least 68 bytes).
pub fn from_buffer(buf: &Vec<u8>) -> Option<Self> {
// Verify that buffer is at least the correct size, if not error
if buf.len() < 68 {
error!("buffer provided to handshake was too short");
return None;
}
let mut p_str = String::new();
for byte in buf.iter().take(20).skip(1) {
p_str.push(*byte as char)
}
let mut info_hash: Vec<u8> = vec![0; 20];
info_hash[..20].copy_from_slice(&buf[28..48]);
let mut peer_id = String::new();
for byte in buf.iter().take(68).skip(48) {
peer_id.push(*byte as char)
}
Some(Self {
p_str_len: buf[0],
p_str,
reserved: [0; 8],
info_hash,
peer_id
})
}
pub fn log_useful_information(&self) {
info!("Connected - PeerId: {:?}", self.peer_id);
}
}

View File

@ -1,258 +0,0 @@
use std::vec;
use log::{error, debug};
/// Represents a message in the BitTorrent protocol.
#[derive(Debug, PartialEq)]
pub struct Message {
/// The length of the message, including the type and payload.
pub message_length: u32,
/// The type of message.
pub message_type: MessageType,
/// The payload of the message, if any.
pub payload: Option<Vec<u8>>,
}
pub trait ToBuffer {
fn to_buffer(self) -> Vec<u8>;
}
pub trait FromBuffer {
fn from_buffer(buf: &[u8]) -> Self;
}
impl Message {
/// Creates a new message.
///
/// # Arguments
///
/// * `message_length` - The length of the message.
/// * `message_type` - The type of message.
/// * `payload` - The payload of the message, if any.
pub fn new(message_length: u32, message_type: MessageType, payload: Option<Vec<u8>>) -> Self {
Self { message_length, message_type, payload }
}
}
impl FromBuffer for Message {
/// Decodes a message from a given buffer.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing the serialized message.
///
/// # Returns
///
/// A new `Message` instance on success, or an empty `Result` indicating an error.
fn from_buffer(buf: &[u8]) -> Self {
let mut message_length: [u8; 4] = [0; 4];
message_length[..4].copy_from_slice(&buf[..4]);
let message_length = u32::from_be_bytes(message_length);
let mut payload: Option<Vec<u8>> = None;
let message_type: MessageType;
if message_length == 0 {
message_type = MessageType::KeepAlive;
payload = None;
} else if message_length == 5 {
message_type = buf[4].into();
payload = None;
} else {
message_type = buf[4].into();
let end_of_message = 4 + message_length as usize;
if end_of_message > buf.len() {
error!("index too long");
debug!("{buf:?}");
} else {
payload = Some(buf[5..end_of_message].to_vec());
}
}
Self {
message_length,
message_type,
payload
}
}
}
impl ToBuffer for Message {
/// Converts the `Message` instance to a byte buffer for sending.
///
/// # Returns
///
/// A byte vector containing the serialized message.
fn to_buffer(self) -> Vec<u8> {
let mut buf: Vec<u8> = vec![];
for byte in self.message_length.to_be_bytes() {
buf.push(byte);
}
match self.message_type {
MessageType::KeepAlive => {
return buf
},
MessageType::Choke | MessageType::Unchoke | MessageType::Interested | MessageType::NotInterested => {
buf.push(self.message_type.into());
return buf;
},
MessageType::Have | MessageType::Bitfield | MessageType::Request | MessageType::Piece | MessageType::Cancel | MessageType::Port => {
buf.push(self.message_type.into());
},
MessageType::Error => {
panic!("Error making message into buffer")
}
}
match self.payload {
None => { panic!("Error you are trying to create a message that needs a payload with no payload") }
Some(payload) => {
buf.extend(payload);
}
}
buf
}
}
impl Message {
/// Create a request message from a given piece_index, offset, and length
///
/// # Arguments
///
/// * `piece_index` - The index of the piece in the torrent
/// * `offset` - The offset within the piece, because requests should be no more than 16KiB
/// * `length` - The length of the piece request, should be 16KiB
///
/// # Returns
///
/// A piece request message
pub fn create_request(piece_index: u32, offset: u32, length: u32) -> Self {
let mut payload: Vec<u8> = vec![];
for byte in piece_index.to_be_bytes() {
payload.push(byte);
}
for byte in offset.to_be_bytes() {
payload.push(byte)
}
for byte in length.to_be_bytes() {
payload.push(byte)
}
Self { message_length: 13, message_type: MessageType::Request, payload: Some(payload) }
}
/// Returns the number of messages in the given buffer and their contents.
///
/// # Arguments
///
/// * `buf` - The byte buffer containing multiple serialized messages.
///
/// # Returns
///
/// A tuple containing a vector of message byte buffers and the number of messages.
pub fn number_of_messages(buf: &[u8]) -> (Vec<Vec<u8>>, u32) {
let mut message_num = 0;
let mut messages: Vec<Vec<u8>> = vec![];
// Find the length of message one
// put that into an array and increment counter by one
let mut i = 0; // points to the front
let mut j; // points to the back
loop {
j = u32::from_be_bytes([buf[i], buf[i + 1], buf[i + 2], buf[i + 3]]) as usize + 4;
messages.push(buf[i..i+j].to_vec());
i += j;
message_num += 1;
if buf[i] == 0 && buf[i + 1] == 0 && buf[i + 2] == 0 && buf[i + 3] == 0 {
break;
}
}
(messages, message_num)
}
}
/// An enum representing all possible message types in the BitTorrent peer wire protocol.
#[derive(Clone, Debug, PartialEq)]
#[repr(u8)]
pub enum MessageType {
/// Keepalive message, 0 length.
/// Potential Errors if trying to handle a keepalive message like another message.
/// Due to length being 0, should always be explicitly handled.
KeepAlive = u8::MAX,
/// Message telling the client not to send any requests until the peer has unchoked, 1 length.
Choke = 0,
/// Message telling the client that it can send requests, 1 length.
Unchoke = 1,
/// Message indicating that the peer is still interested in downloading, 1 length.
Interested = 2,
/// Message indicating that the peer is not interested in downloading, 1 length.
NotInterested = 3,
/// Message indicating that the peer has a given piece, fixed length.
Have = 4,
/// Message sent after a handshake, represents the pieces that the peer has.
Bitfield = 5,
/// Request a given part of a piece based on index, offset, and length, 13 length.
Request = 6,
/// A response to a request with the accompanying data, varying length.
Piece = 7,
/// Cancels a request, 13 length.
Cancel = 8,
/// Placeholder for unimplemented message type.
Port = 9,
Error
}
impl From<u8> for MessageType {
fn from(val: u8) -> MessageType {
match val {
0 => MessageType::Choke,
1 => MessageType::Unchoke,
2 => MessageType::Interested,
3 => MessageType::NotInterested,
4 => MessageType::Have,
5 => MessageType::Bitfield,
6 => MessageType::Request,
7 => MessageType::Piece,
8 => MessageType::Cancel,
9 => MessageType::Port,
_ => {
error!("Invalid Message Type: {}", val);
MessageType::Error
}
}
}
}
impl From<MessageType> for u8 {
fn from(val: MessageType) -> u8 {
match val {
MessageType::Choke => 0,
MessageType::Unchoke => 1,
MessageType::Interested => 2,
MessageType::NotInterested => 3,
MessageType::Have => 4,
MessageType::Bitfield => 5,
MessageType::Request => 6,
MessageType::Piece => 7,
MessageType::Cancel => 8,
MessageType::Port => 9,
_ => {
error!("Invalid Message Type: {:?}", val);
u8::MAX
}
}
}
}

View File

@ -1,249 +0,0 @@
use log::{debug, info, error, warn, trace};
use regex::Regex;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1};
use tokio::{fs::File as TokioFile, io::AsyncReadExt};
use std::net::{IpAddr, SocketAddrV4};
/// Represents a node in a DHT network.
#[derive(Clone, Debug, Deserialize, Serialize)]
struct Node(String, i64);
/// Represents a file described in a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct File {
pub path: Vec<String>,
pub length: u64,
#[serde(default)]
md5sum: Option<String>,
}
/// Represents the metadata of a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Info {
pub name: String,
#[serde(with = "serde_bytes")]
pub pieces: Vec<u8>,
#[serde(rename = "piece length")]
pub piece_length: u64,
#[serde(default)]
md5sum: Option<String>,
#[serde(default)]
pub length: Option<i64>,
#[serde(default)]
pub files: Option<Vec<File>>,
#[serde(default)]
private: Option<u8>,
#[serde(default)]
path: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "root hash")]
root_hash: Option<String>,
}
/// Represents a torrent.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Torrent {
pub info: Info,
#[serde(default)]
pub announce: Option<String>,
#[serde(default)]
nodes: Option<Vec<Node>>,
#[serde(default)]
encoding: Option<String>,
#[serde(default)]
httpseeds: Option<Vec<String>>,
#[serde(default)]
#[serde(rename = "announce-list")]
announce_list: Option<Vec<Vec<String>>>,
#[serde(default)]
#[serde(rename = "creation date")]
creation_date: Option<i64>,
#[serde(rename = "comment")]
comment: Option<String>,
#[serde(default)]
#[serde(rename = "created by")]
created_by: Option<String>
}
impl Torrent {
/// Reads a `.torrent` file and converts it into a `Torrent` struct.
///
/// # Arguments
///
/// * `path` - The path to the `.torrent` file.
pub async fn from_torrent_file(path: &str) -> Self {
info!("");
info!("--> Reading File <--");
let Ok(mut file) = TokioFile::open(path).await else {
error!("Unable to read file at {path}");
panic!("Unable to read file at {path}");
};
info!("Found\t\t > {path}");
let mut buf: Vec<u8> = Vec::new();
let Ok(_) = file.read_to_end(&mut buf).await else {
error!("Error reading file > {path}");
panic!("Error reading file > {path}");
};
info!("Read\t\t > {path}");
let Ok(torrent) = serde_bencode::from_bytes(&buf) else {
error!("Error deserializing file > {path}");
panic!("Error deserializing file > {path}");
};
info!("Parsed\t > {path}");
torrent
}
}
impl Torrent {
/// Logs info about the *.torrent file
pub fn log_useful_information(&self) {
info!("");
info!("--> Torrent Information <--");
info!("Name:\t\t{}", self.info.name);
info!("Trackers");
if let Some(trackers) = &self.announce_list {
for tracker in trackers {
info!(" |> {}", tracker[0])
}
}
info!("InfoHash:\t{:X?}", self.get_info_hash());
info!("Length:\t{:?}", self.info.length);
info!("Files:");
let Some(mut files) = self.info.files.clone() else {
info!("./{}", self.info.name);
return
};
files.sort_by(|a, b| a.path.len().cmp(&b.path.len()) );
info!("./");
for file in files {
if file.path.len() == 1 {
info!(" |--> {:?}", file.path);
} else {
let mut path = String::new();
file.path.iter().for_each(|s| { path.push_str(s); path.push('/') } );
path.pop();
info!(" |--> {}: {}B", path, file.length)
}
}
}
/// Calculates the info hash of the torrent.
pub fn get_info_hash(&self) -> Vec<u8> {
let buf = serde_bencode::to_bytes(&self.info).unwrap();
let mut hasher = Sha1::new();
hasher.update(buf);
let res = hasher.finalize();
res[..].to_vec()
}
/// Checks if a downloaded piece matches its hash.
///
/// # Arguments
///
/// * `piece` - The downloaded piece.
/// * `index` - The index of the piece.
///
/// # Returns
///
/// * `true` if the piece is correct, `false` otherwise.
pub fn check_piece(&self, piece: &[u8], index: u32) -> bool {
let mut hasher = Sha1::new();
hasher.update(piece);
let result = hasher.finalize();
let piece_hash = &self.info.pieces[(index * 20) as usize..(index * 20 + 20) as usize];
if &result[..] == piece_hash {
info!("Piece {}/{} Correct!", index + 1, self.info.pieces.len() / 20);
true
} else {
error!("Piece {}/{} incorrect :(", index + 1, self.info.pieces.len() / 20);
debug!("{:?}", &result[..]);
debug!("{:?}", piece_hash);
debug!("{:?}", &result[..].len());
debug!("{:?}", piece_hash.len());
debug!("{}", piece.len());
false
}
}
pub fn get_total_length(&self) -> u64 {
if let Some(n) = self.info.length {
return n as u64
};
if let Some(files) = &self.info.files {
let mut n = 0;
for file in files {
n += file.length;
};
return n
};
0
}
pub fn get_trackers(&self) -> Option<Vec<SocketAddrV4>> {
info!("");
info!("--> Locating Trackers <--");
let mut addresses = vec![];
// This is the current regex as I haven't implemented support for http trackers yet
let re = Regex::new(r"^udp://([^:/]+):(\d+)/announce$").unwrap();
if let Some(url) = &self.announce {
if let Some(captures) = re.captures(url) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()))
}
}
}
} else {
warn!("{url} does not match the expected url pattern");
}
}
if let Some(urls) = &self.announce_list {
for url in urls.iter() {
if let Some(captures) = re.captures(&url[0]) {
let hostname = captures.get(1).unwrap().as_str();
let port = captures.get(2).unwrap().as_str();
if let Ok(ip) = dns_lookup::lookup_host(hostname) {
for i in ip {
if let IpAddr::V4(j) = i {
addresses.push(SocketAddrV4::new(j, port.parse().unwrap()));
}
}
info!("Sucessfully found tracker {}", url[0]);
}
} else {
warn!("{} does not match the expected url pattern", url[0]);
}
}
}
if addresses.len() > 0 {
Some(addresses)
} else {
None
}
}
}

View File

@ -1 +0,0 @@
pub mod tracker;