From 21a627af737d156e07b2859244b4b17e05df1ecb Mon Sep 17 00:00:00 2001 From: bonedaddy Date: Wed, 13 Apr 2022 19:21:48 -0700 Subject: [PATCH] add basic session watcher --- examples/session_watcher/Cargo.toml | 18 +++++ examples/session_watcher/README.md | 5 ++ examples/session_watcher/src/main.rs | 49 ++++++++++++++ src/error.rs | 2 + src/lib.rs | 1 + src/net/streaming.rs | 9 +++ src/sam.rs | 2 +- src/session_watcher.rs | 98 ++++++++++++++++++++++++++++ 8 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 examples/session_watcher/Cargo.toml create mode 100644 examples/session_watcher/README.md create mode 100644 examples/session_watcher/src/main.rs create mode 100644 src/session_watcher.rs diff --git a/examples/session_watcher/Cargo.toml b/examples/session_watcher/Cargo.toml new file mode 100644 index 0000000..db62ee3 --- /dev/null +++ b/examples/session_watcher/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "session_watcher" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +i2p = {path = "../../", version = "0.1.0"} +env_logger = "0.5" +log = "0.4" +crossbeam = "0.8" +crossbeam-utils = "0.8" +crossbeam-channel = "0.5" +tokio = {version = "1.17.0", features = ["full"]} +[[bin]] +name = "session_watcher" +path = "src/main.rs" diff --git a/examples/session_watcher/README.md b/examples/session_watcher/README.md new file mode 100644 index 0000000..b8d75f8 --- /dev/null +++ b/examples/session_watcher/README.md @@ -0,0 +1,5 @@ +# Usage + +```shell +$> RUST_LOG=debug cargo run +``` \ No newline at end of file diff --git a/examples/session_watcher/src/main.rs b/examples/session_watcher/src/main.rs new file mode 100644 index 0000000..fb6da1b --- /dev/null +++ b/examples/session_watcher/src/main.rs @@ -0,0 +1,49 @@ +use crossbeam::sync::WaitGroup; +use env_logger; +use i2p; + +use crossbeam_channel::select; +use i2p::net::{I2pListener, I2pStream}; +use i2p::sam_options::{ + I2CPClientOptions, I2CPOptions, I2CPRouterOptions, I2CPTunnelInboundOptions, + I2CPTunnelOutboundOptions, SAMOptions, SignatureType, +}; +use log::*; +use std::io::{Read, Write}; +use std::net::Shutdown; +use std::str::from_utf8; +use std::{thread, time}; + +use i2p::sam::{SamConnection, SessionStyle, DEFAULT_API}; + +// Run with RUST_LOG=debug to see the action +#[tokio::main] +async fn main() { + env_logger::init(); + let (pubkey, seckey) = { + let mut sam_conn = SamConnection::connect(DEFAULT_API).unwrap(); + sam_conn + .generate_destination(SignatureType::EdDsaSha512Ed25519) + .unwrap() + }; + info!("New public key: {}", pubkey); + info!("New secret key: {}", seckey); + let mut watcher = i2p::session_watcher::SamSessionWatcher::new( + DEFAULT_API, + &seckey, + SessionStyle::Stream, + Default::default() + ).unwrap(); + + loop { + match watcher.accept() { + Ok((conn, addr)) => { + info!("receiving incoming connection {}", addr); + let _ = conn.shutdown(Shutdown::Both).unwrap(); + } + Err(err) => { + error!("failed to accept connection {:#?}", err); + } + } + } +} diff --git a/src/error.rs b/src/error.rs index de68875..a20c51c 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,8 @@ pub enum ErrorKind { SAMI2PError(String), #[fail(display = "I2P address isn't a valid b32 or b64 encoding: {}", _0)] BadAddressEncoding(String), + #[fail(display = "Accept encountered error, and session was recreated. try operation again")] + SessionRecreated, } impl ErrorKind { diff --git a/src/lib.rs b/src/lib.rs index 5eed0c7..775fe1b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod error; pub mod net; pub mod sam; pub mod sam_options; +pub mod session_watcher; mod parsers; diff --git a/src/net/streaming.rs b/src/net/streaming.rs index 5aa79c0..987acba 100644 --- a/src/net/streaming.rs +++ b/src/net/streaming.rs @@ -28,6 +28,9 @@ use crate::sam::{Session, StreamConnect, StreamForward, DEFAULT_API}; /// } // the stream is closed here /// ``` pub struct I2pStream { + #[cfg(feature = "public-conn")] + pub inner: StreamConnect, + #[cfg(not(feature = "public-conn"))] inner: StreamConnect, } @@ -42,6 +45,9 @@ pub struct I2pStream { /// [`incoming`]: struct.I2pListener.html#method.incoming /// [`I2pListener`]: struct.I2pListener.html pub struct Incoming<'a> { + #[cfg(feature = "public-conn")] + pub listener: &'a I2pListener, + #[cfg(not(feature = "public-conn"))] listener: &'a I2pListener, } @@ -248,6 +254,9 @@ impl fmt::Debug for I2pStream { /// } /// ``` pub struct I2pListener { + #[cfg(feature = "public-conn")] + pub forward: StreamForward, + #[cfg(not(feature = "public-conn"))] forward: StreamForward, } diff --git a/src/sam.rs b/src/sam.rs index 02ef58a..99229b2 100644 --- a/src/sam.rs +++ b/src/sam.rs @@ -404,7 +404,7 @@ impl StreamForward { } } -fn nickname() -> String { +pub fn nickname() -> String { let suffix: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(8) diff --git a/src/session_watcher.rs b/src/session_watcher.rs new file mode 100644 index 0000000..6d00c8a --- /dev/null +++ b/src/session_watcher.rs @@ -0,0 +1,98 @@ +//! provides a basic session watcher which wraps [I2pListener::accept] ensuring that +//! any errors which result in the session being terminated, such as clients improperly disconnecting +//! or other network/transport level issues are handled gracefully. +//! +//! any calls to accept which result in an error will cause the existing session and i2plistener to be dropped, +//! before they are recreated and an error is returned information the caller to try the operation again +//! + + +use std::net::Shutdown; + + +use crate::{sam::{StreamConnect, SessionStyle, nickname}, net::{I2pSocketAddr, I2pListener}, Session, sam_options::SAMOptions, Error, ErrorKind}; +use log::{info, warn, error}; + +/// SamSessionWatcher provides the ability to gracefully handle +/// runtime errors by restarting the sam session, and recreating the listener +/// any time errors are detected. +/// +/// note: should implement better detection of which errors cause us +/// to recreate the connection +pub struct SamSessionWatcher { + opts: SAMOptions, + session: Session, + destination: String, + sam_endpoint: String, + session_style: SessionStyle, + pub listener: I2pListener, +} + +impl SamSessionWatcher { + pub fn new( + sam_endpoint: &str, + destination: &str, + session_style: SessionStyle, + opts: SAMOptions, + ) -> Result, Error> { + let (session, listener) = SamSessionWatcher::__recreate( + sam_endpoint, + destination, + &nickname(), + session_style.clone(), + opts.clone() + )?; + Ok(Box::new(SamSessionWatcher { + opts, + session, + listener, + session_style, + destination: destination.to_string(), + sam_endpoint: sam_endpoint.to_string(), + })) + } + pub fn accept(self: &mut Box) -> Result<(StreamConnect, I2pSocketAddr), Error> { + match self.listener.forward.accept() { + Ok(res) => Ok(res), + Err(err) => { + error!("accept encountered error, recreating stream: {:#?}", err); + { + drop(&mut self.listener); + self.session.sam.conn.shutdown(Shutdown::Both)?; + drop(&mut self.session); + } + self.recreate()?; + Err(ErrorKind::SessionRecreated.into()) + } + } + } + fn recreate(self: &mut Box) -> Result<(), Error> { + let (session, listener) = SamSessionWatcher::__recreate( + &self.sam_endpoint, + &self.destination, + &nickname(), + self.session_style.clone(), + self.opts.clone() + )?; + self.session = session; + self.listener = listener; + Ok(()) + } + fn __recreate( + sam_endpoint: &str, + destination: &str, + nickname: &str, + session_style: SessionStyle, + opts: SAMOptions, + ) -> Result<(Session, I2pListener), Error> { + let session = Session::create( + sam_endpoint, + destination, + nickname, + session_style, + opts.clone(), + )?; + let listener = I2pListener::bind_with_session(&session)?; + Ok((session, listener)) + } +} \ No newline at end of file