From 4755612f11aacb4ec2579b3ce348e81bdda54606 Mon Sep 17 00:00:00 2001 From: eyedeekay Date: Sun, 1 Jun 2025 16:38:11 -0400 Subject: [PATCH] Work on datagram1 --- README.md | 14 ++++++++ datagram/dial.go | 79 ++++++++++++++++++++++++++++++++++++------- datagram/dial_test.go | 19 +++++++++-- datagram/read.go | 49 ++++++++++++++++----------- datagram/session.go | 2 +- 5 files changed, 128 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index c0d4f1b..0ea7bd3 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,20 @@ raw, err := session.NewRawSession("raw", keys, options, 0) n, err := raw.WriteTo(data, dest) ``` +#### `datagram2` Package +Authenticated repliable datagrams: +```go +dgram2, err := session.NewDatagram2Session("udp", keys, options, 0) +n, err := dgram.WriteTo(data, dest) +``` + +#### `datagram3` Package +Authenticated repliable datagrams: +```go +dgram3, err := session.NewDatagram3Session("udp", keys, options, 0) +n, err := dgram.WriteTo(data, dest) +``` + ### Configuration Built-in configuration profiles: diff --git a/datagram/dial.go b/datagram/dial.go index d2a3899..f3513ab 100644 --- a/datagram/dial.go +++ b/datagram/dial.go @@ -2,11 +2,11 @@ package datagram import ( "context" - "fmt" "net" "time" "github.com/go-i2p/i2pkeys" + "github.com/samber/oops" "github.com/sirupsen/logrus" ) @@ -24,11 +24,32 @@ func (ds *DatagramSession) DialTimeout(destination string, timeout time.Duration // DialContext establishes a datagram connection with context support func (ds *DatagramSession) DialContext(ctx context.Context, destination string) (net.PacketConn, error) { + // Check if session is closed first + ds.mu.RLock() + if ds.closed { + ds.mu.RUnlock() + return nil, oops.Errorf("session is closed") + } + ds.mu.RUnlock() + logger := log.WithFields(logrus.Fields{ "destination": destination, }) logger.Debug("Dialing datagram destination") + // Check context cancellation before proceeding + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + // Parse destination address + destAddr, err := i2pkeys.NewI2PAddrFromString(destination) + if err != nil { + return nil, oops.Errorf("invalid destination address: %w", err) + } + // Create a datagram connection conn := &DatagramConn{ session: ds, @@ -36,10 +57,22 @@ func (ds *DatagramSession) DialContext(ctx context.Context, destination string) writer: ds.NewWriter(), } - // Start the reader loop - go conn.reader.receiveLoop() + // Set remote address for the connection + conn.remoteAddr = &destAddr - logger.WithField("session_id", ds.ID()).Debug("Successfully created datagram connection") + // Start the reader loop in a goroutine with context cancellation + go func() { + select { + case <-ctx.Done(): + // Context cancelled, close the reader + conn.reader.Close() + return + default: + conn.reader.receiveLoop() + } + }() + + logger.Debug("Successfully created datagram connection") return conn, nil } @@ -57,11 +90,26 @@ func (ds *DatagramSession) DialI2PTimeout(addr i2pkeys.I2PAddr, timeout time.Dur // DialI2PContext establishes a datagram connection to an I2P address with context support func (ds *DatagramSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr) (net.PacketConn, error) { + // Check if session is closed first + ds.mu.RLock() + if ds.closed { + ds.mu.RUnlock() + return nil, oops.Errorf("session is closed") + } + ds.mu.RUnlock() + logger := log.WithFields(logrus.Fields{ "destination": addr.Base32(), }) logger.Debug("Dialing I2P datagram destination") + // Check context cancellation before proceeding + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + // Create a datagram connection conn := &DatagramConn{ session: ds, @@ -69,14 +117,21 @@ func (ds *DatagramSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PA writer: ds.NewWriter(), } - // Start the reader loop - go conn.reader.receiveLoop() + // Set remote address for the connection + conn.remoteAddr = &addr - logger.WithField("session_id", ds.ID()).Debug("Successfully created I2P datagram connection") + // Start the reader loop in a goroutine with context cancellation + go func() { + select { + case <-ctx.Done(): + // Context cancelled, close the reader + conn.reader.Close() + return + default: + conn.reader.receiveLoop() + } + }() + + logger.Debug("Successfully created I2P datagram connection") return conn, nil } - -// generateSessionID generates a unique session identifier -func generateSessionID() string { - return fmt.Sprintf("datagram_%d", time.Now().UnixNano()) -} diff --git a/datagram/dial_test.go b/datagram/dial_test.go index 1f56eb5..02954ff 100644 --- a/datagram/dial_test.go +++ b/datagram/dial_test.go @@ -2,6 +2,7 @@ package datagram import ( "context" + "errors" "testing" "time" ) @@ -43,7 +44,11 @@ func TestDatagramSession_Dial(t *testing.T) { defer dialerSession.Close() // Test dial - conn, err := dialerSession.Dial(listener.Addr().String()) + dest, err := dialerSession.sam.Lookup(listener.Addr().String()) + if err != nil { + t.Fatalf("Failed to lookup listener address: %v", err) + } + conn, err := dialerSession.Dial(dest.Base64()) if err != nil { t.Fatalf("Failed to dial: %v", err) } @@ -124,12 +129,20 @@ func TestDatagramSession_DialContext_Timeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond) defer cancel() - // Try to dial invalid address with short timeout - conn, err := session.DialContext(ctx, "invalid.b32.i2p") + // Try to dial with short timeout + conn, err := session.DialContext(ctx, "idk.i2p") + + // Should get context deadline exceeded error if err == nil { if conn != nil { conn.Close() } + t.Fatal("Expected timeout error") + } + + // Should be a context deadline exceeded error + if !errors.Is(err, context.DeadlineExceeded) { + t.Errorf("Expected context.DeadlineExceeded, got: %v", err) } if conn != nil { diff --git a/datagram/read.go b/datagram/read.go index c80a0d2..0f7adf2 100644 --- a/datagram/read.go +++ b/datagram/read.go @@ -12,7 +12,6 @@ import ( // ReceiveDatagram receives a datagram from any source func (r *DatagramReader) ReceiveDatagram() (*Datagram, error) { - // Check if closed first, but don't rely on this check for safety r.mu.RLock() if r.closed { r.mu.RUnlock() @@ -20,16 +19,12 @@ func (r *DatagramReader) ReceiveDatagram() (*Datagram, error) { } r.mu.RUnlock() - // Use select with closeChan to handle concurrent close operations safely - // The closeChan will be signaled when Close() is called, providing - // a reliable way to detect closure even if it happens during this function select { case datagram := <-r.recvChan: return datagram, nil case err := <-r.errorChan: return nil, err case <-r.closeChan: - // This case handles both initial closure check and concurrent closure return nil, oops.Errorf("reader is closed") } } @@ -50,39 +45,53 @@ func (r *DatagramReader) Close() error { // Signal termination to receiveLoop close(r.closeChan) - // Wait for receiveLoop to signal it has exited by closing doneChan - // This ensures proper synchronization without arbitrary delays + // Wait for receiveLoop to signal completion with timeout protection select { case <-r.doneChan: // receiveLoop has confirmed it stopped + logger.Debug("Receive loop terminated gracefully") case <-time.After(5 * time.Second): // Timeout protection - log warning but continue cleanup logger.Warn("Timeout waiting for receive loop to stop") } - // Now safe to close the receiver channels since receiveLoop has stopped - close(r.recvChan) - close(r.errorChan) + // Close receiver channels only after receiveLoop has stopped + // Use non-blocking close to avoid deadlock if channels are already closed + r.safeCloseChannel() logger.Debug("Successfully closed DatagramReader") return nil } +// safeCloseChannel safely closes channels with panic recovery +func (r *DatagramReader) safeCloseChannel() { + defer func() { + if recover() != nil { + // Channel was already closed - this is expected in some race conditions + } + }() + + close(r.recvChan) + close(r.errorChan) +} + // receiveLoop continuously receives incoming datagrams func (r *DatagramReader) receiveLoop() { logger := log.WithField("session_id", r.session.ID()) logger.Debug("Starting receive loop") - // Signal completion when this loop exits - doneChan must be initialized - // before this goroutine starts to avoid race conditions with Close() + // Ensure doneChan is properly signaled when loop exits defer func() { - if r.doneChan != nil { - close(r.doneChan) + // Use non-blocking send to avoid deadlock if Close() isn't waiting + select { + case r.doneChan <- struct{}{}: + default: } + logger.Debug("Receive loop goroutine terminated") }() for { - // Check for closure in a non-blocking way first + // Check for closure signal before any blocking operations select { case <-r.closeChan: logger.Debug("Receive loop terminated - reader closed") @@ -90,15 +99,16 @@ func (r *DatagramReader) receiveLoop() { default: } - // Now perform the blocking read operation + // Perform the blocking read operation datagram, err := r.receiveDatagram() if err != nil { - // Use atomic check and send pattern to avoid race + // Use atomic send pattern with close check to prevent panic select { case r.errorChan <- err: logger.WithError(err).Error("Failed to receive datagram") case <-r.closeChan: - // Reader was closed during error handling + // Reader was closed during error handling - exit gracefully + logger.Debug("Receive loop terminated during error handling") return } continue @@ -109,7 +119,8 @@ func (r *DatagramReader) receiveLoop() { case r.recvChan <- datagram: logger.Debug("Successfully received datagram") case <-r.closeChan: - // Reader was closed during datagram send + // Reader was closed during datagram send - exit gracefully + logger.Debug("Receive loop terminated during datagram send") return } } diff --git a/datagram/session.go b/datagram/session.go index 40e3778..5634835 100644 --- a/datagram/session.go +++ b/datagram/session.go @@ -49,7 +49,7 @@ func (s *DatagramSession) NewReader() *DatagramReader { recvChan: make(chan *Datagram, 10), // Buffer for incoming datagrams errorChan: make(chan error, 1), closeChan: make(chan struct{}), - doneChan: make(chan struct{}), + doneChan: make(chan struct{}, 1), closed: false, mu: sync.RWMutex{}, }