add basic session watcher

This commit is contained in:
bonedaddy
2022-04-13 19:21:48 -07:00
parent 0f3229809c
commit 21a627af73
8 changed files with 183 additions and 1 deletions

View File

@ -0,0 +1,18 @@
[package]
name = "session_watcher"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
i2p = {path = "../../", version = "0.1.0"}
env_logger = "0.5"
log = "0.4"
crossbeam = "0.8"
crossbeam-utils = "0.8"
crossbeam-channel = "0.5"
tokio = {version = "1.17.0", features = ["full"]}
[[bin]]
name = "session_watcher"
path = "src/main.rs"

View File

@ -0,0 +1,5 @@
# Usage
```shell
$> RUST_LOG=debug cargo run
```

View File

@ -0,0 +1,49 @@
use crossbeam::sync::WaitGroup;
use env_logger;
use i2p;
use crossbeam_channel::select;
use i2p::net::{I2pListener, I2pStream};
use i2p::sam_options::{
I2CPClientOptions, I2CPOptions, I2CPRouterOptions, I2CPTunnelInboundOptions,
I2CPTunnelOutboundOptions, SAMOptions, SignatureType,
};
use log::*;
use std::io::{Read, Write};
use std::net::Shutdown;
use std::str::from_utf8;
use std::{thread, time};
use i2p::sam::{SamConnection, SessionStyle, DEFAULT_API};
// Run with RUST_LOG=debug to see the action
#[tokio::main]
async fn main() {
env_logger::init();
let (pubkey, seckey) = {
let mut sam_conn = SamConnection::connect(DEFAULT_API).unwrap();
sam_conn
.generate_destination(SignatureType::EdDsaSha512Ed25519)
.unwrap()
};
info!("New public key: {}", pubkey);
info!("New secret key: {}", seckey);
let mut watcher = i2p::session_watcher::SamSessionWatcher::new(
DEFAULT_API,
&seckey,
SessionStyle::Stream,
Default::default()
).unwrap();
loop {
match watcher.accept() {
Ok((conn, addr)) => {
info!("receiving incoming connection {}", addr);
let _ = conn.shutdown(Shutdown::Both).unwrap();
}
Err(err) => {
error!("failed to accept connection {:#?}", err);
}
}
}
}

View File

@ -41,6 +41,8 @@ pub enum ErrorKind {
SAMI2PError(String),
#[fail(display = "I2P address isn't a valid b32 or b64 encoding: {}", _0)]
BadAddressEncoding(String),
#[fail(display = "Accept encountered error, and session was recreated. try operation again")]
SessionRecreated,
}
impl ErrorKind {

View File

@ -2,6 +2,7 @@ pub mod error;
pub mod net;
pub mod sam;
pub mod sam_options;
pub mod session_watcher;
mod parsers;

View File

@ -28,6 +28,9 @@ use crate::sam::{Session, StreamConnect, StreamForward, DEFAULT_API};
/// } // the stream is closed here
/// ```
pub struct I2pStream {
#[cfg(feature = "public-conn")]
pub inner: StreamConnect,
#[cfg(not(feature = "public-conn"))]
inner: StreamConnect,
}
@ -42,6 +45,9 @@ pub struct I2pStream {
/// [`incoming`]: struct.I2pListener.html#method.incoming
/// [`I2pListener`]: struct.I2pListener.html
pub struct Incoming<'a> {
#[cfg(feature = "public-conn")]
pub listener: &'a I2pListener,
#[cfg(not(feature = "public-conn"))]
listener: &'a I2pListener,
}
@ -248,6 +254,9 @@ impl fmt::Debug for I2pStream {
/// }
/// ```
pub struct I2pListener {
#[cfg(feature = "public-conn")]
pub forward: StreamForward,
#[cfg(not(feature = "public-conn"))]
forward: StreamForward,
}

View File

@ -404,7 +404,7 @@ impl StreamForward {
}
}
fn nickname() -> String {
pub fn nickname() -> String {
let suffix: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)

98
src/session_watcher.rs Normal file
View File

@ -0,0 +1,98 @@
//! provides a basic session watcher which wraps [I2pListener::accept] ensuring that
//! any errors which result in the session being terminated, such as clients improperly disconnecting
//! or other network/transport level issues are handled gracefully.
//!
//! any calls to accept which result in an error will cause the existing session and i2plistener to be dropped,
//! before they are recreated and an error is returned information the caller to try the operation again
//!
use std::net::Shutdown;
use crate::{sam::{StreamConnect, SessionStyle, nickname}, net::{I2pSocketAddr, I2pListener}, Session, sam_options::SAMOptions, Error, ErrorKind};
use log::{info, warn, error};
/// SamSessionWatcher provides the ability to gracefully handle
/// runtime errors by restarting the sam session, and recreating the listener
/// any time errors are detected.
///
/// note: should implement better detection of which errors cause us
/// to recreate the connection
pub struct SamSessionWatcher {
opts: SAMOptions,
session: Session,
destination: String,
sam_endpoint: String,
session_style: SessionStyle,
pub listener: I2pListener,
}
impl SamSessionWatcher {
pub fn new(
sam_endpoint: &str,
destination: &str,
session_style: SessionStyle,
opts: SAMOptions,
) -> Result<Box<SamSessionWatcher>, Error> {
let (session, listener) = SamSessionWatcher::__recreate(
sam_endpoint,
destination,
&nickname(),
session_style.clone(),
opts.clone()
)?;
Ok(Box::new(SamSessionWatcher {
opts,
session,
listener,
session_style,
destination: destination.to_string(),
sam_endpoint: sam_endpoint.to_string(),
}))
}
pub fn accept(self: &mut Box<Self>) -> Result<(StreamConnect, I2pSocketAddr), Error> {
match self.listener.forward.accept() {
Ok(res) => Ok(res),
Err(err) => {
error!("accept encountered error, recreating stream: {:#?}", err);
{
drop(&mut self.listener);
self.session.sam.conn.shutdown(Shutdown::Both)?;
drop(&mut self.session);
}
self.recreate()?;
Err(ErrorKind::SessionRecreated.into())
}
}
}
fn recreate(self: &mut Box<Self>) -> Result<(), Error> {
let (session, listener) = SamSessionWatcher::__recreate(
&self.sam_endpoint,
&self.destination,
&nickname(),
self.session_style.clone(),
self.opts.clone()
)?;
self.session = session;
self.listener = listener;
Ok(())
}
fn __recreate(
sam_endpoint: &str,
destination: &str,
nickname: &str,
session_style: SessionStyle,
opts: SAMOptions,
) -> Result<(Session, I2pListener), Error> {
let session = Session::create(
sam_endpoint,
destination,
nickname,
session_style,
opts.clone(),
)?;
let listener = I2pListener::bind_with_session(&session)?;
Ok((session, listener))
}
}