mirror of
https://github.com/go-i2p/go-sam-go.git
synced 2025-06-30 20:53:03 -04:00
Work on datagram1
This commit is contained in:
14
README.md
14
README.md
@ -94,6 +94,20 @@ raw, err := session.NewRawSession("raw", keys, options, 0)
|
||||
n, err := raw.WriteTo(data, dest)
|
||||
```
|
||||
|
||||
#### `datagram2` Package
|
||||
Authenticated repliable datagrams:
|
||||
```go
|
||||
dgram2, err := session.NewDatagram2Session("udp", keys, options, 0)
|
||||
n, err := dgram.WriteTo(data, dest)
|
||||
```
|
||||
|
||||
#### `datagram3` Package
|
||||
Authenticated repliable datagrams:
|
||||
```go
|
||||
dgram3, err := session.NewDatagram3Session("udp", keys, options, 0)
|
||||
n, err := dgram.WriteTo(data, dest)
|
||||
```
|
||||
|
||||
### Configuration
|
||||
|
||||
Built-in configuration profiles:
|
||||
|
@ -2,11 +2,11 @@ package datagram
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/go-i2p/i2pkeys"
|
||||
"github.com/samber/oops"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -24,11 +24,32 @@ func (ds *DatagramSession) DialTimeout(destination string, timeout time.Duration
|
||||
|
||||
// DialContext establishes a datagram connection with context support
|
||||
func (ds *DatagramSession) DialContext(ctx context.Context, destination string) (net.PacketConn, error) {
|
||||
// Check if session is closed first
|
||||
ds.mu.RLock()
|
||||
if ds.closed {
|
||||
ds.mu.RUnlock()
|
||||
return nil, oops.Errorf("session is closed")
|
||||
}
|
||||
ds.mu.RUnlock()
|
||||
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"destination": destination,
|
||||
})
|
||||
logger.Debug("Dialing datagram destination")
|
||||
|
||||
// Check context cancellation before proceeding
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Parse destination address
|
||||
destAddr, err := i2pkeys.NewI2PAddrFromString(destination)
|
||||
if err != nil {
|
||||
return nil, oops.Errorf("invalid destination address: %w", err)
|
||||
}
|
||||
|
||||
// Create a datagram connection
|
||||
conn := &DatagramConn{
|
||||
session: ds,
|
||||
@ -36,10 +57,22 @@ func (ds *DatagramSession) DialContext(ctx context.Context, destination string)
|
||||
writer: ds.NewWriter(),
|
||||
}
|
||||
|
||||
// Start the reader loop
|
||||
go conn.reader.receiveLoop()
|
||||
// Set remote address for the connection
|
||||
conn.remoteAddr = &destAddr
|
||||
|
||||
logger.WithField("session_id", ds.ID()).Debug("Successfully created datagram connection")
|
||||
// Start the reader loop in a goroutine with context cancellation
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context cancelled, close the reader
|
||||
conn.reader.Close()
|
||||
return
|
||||
default:
|
||||
conn.reader.receiveLoop()
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Debug("Successfully created datagram connection")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
@ -57,11 +90,26 @@ func (ds *DatagramSession) DialI2PTimeout(addr i2pkeys.I2PAddr, timeout time.Dur
|
||||
|
||||
// DialI2PContext establishes a datagram connection to an I2P address with context support
|
||||
func (ds *DatagramSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr) (net.PacketConn, error) {
|
||||
// Check if session is closed first
|
||||
ds.mu.RLock()
|
||||
if ds.closed {
|
||||
ds.mu.RUnlock()
|
||||
return nil, oops.Errorf("session is closed")
|
||||
}
|
||||
ds.mu.RUnlock()
|
||||
|
||||
logger := log.WithFields(logrus.Fields{
|
||||
"destination": addr.Base32(),
|
||||
})
|
||||
logger.Debug("Dialing I2P datagram destination")
|
||||
|
||||
// Check context cancellation before proceeding
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
// Create a datagram connection
|
||||
conn := &DatagramConn{
|
||||
session: ds,
|
||||
@ -69,14 +117,21 @@ func (ds *DatagramSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PA
|
||||
writer: ds.NewWriter(),
|
||||
}
|
||||
|
||||
// Start the reader loop
|
||||
go conn.reader.receiveLoop()
|
||||
// Set remote address for the connection
|
||||
conn.remoteAddr = &addr
|
||||
|
||||
logger.WithField("session_id", ds.ID()).Debug("Successfully created I2P datagram connection")
|
||||
// Start the reader loop in a goroutine with context cancellation
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Context cancelled, close the reader
|
||||
conn.reader.Close()
|
||||
return
|
||||
default:
|
||||
conn.reader.receiveLoop()
|
||||
}
|
||||
}()
|
||||
|
||||
logger.Debug("Successfully created I2P datagram connection")
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// generateSessionID generates a unique session identifier
|
||||
func generateSessionID() string {
|
||||
return fmt.Sprintf("datagram_%d", time.Now().UnixNano())
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package datagram
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
@ -43,7 +44,11 @@ func TestDatagramSession_Dial(t *testing.T) {
|
||||
defer dialerSession.Close()
|
||||
|
||||
// Test dial
|
||||
conn, err := dialerSession.Dial(listener.Addr().String())
|
||||
dest, err := dialerSession.sam.Lookup(listener.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to lookup listener address: %v", err)
|
||||
}
|
||||
conn, err := dialerSession.Dial(dest.Base64())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to dial: %v", err)
|
||||
}
|
||||
@ -124,12 +129,20 @@ func TestDatagramSession_DialContext_Timeout(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
// Try to dial invalid address with short timeout
|
||||
conn, err := session.DialContext(ctx, "invalid.b32.i2p")
|
||||
// Try to dial with short timeout
|
||||
conn, err := session.DialContext(ctx, "idk.i2p")
|
||||
|
||||
// Should get context deadline exceeded error
|
||||
if err == nil {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
t.Fatal("Expected timeout error")
|
||||
}
|
||||
|
||||
// Should be a context deadline exceeded error
|
||||
if !errors.Is(err, context.DeadlineExceeded) {
|
||||
t.Errorf("Expected context.DeadlineExceeded, got: %v", err)
|
||||
}
|
||||
|
||||
if conn != nil {
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
|
||||
// ReceiveDatagram receives a datagram from any source
|
||||
func (r *DatagramReader) ReceiveDatagram() (*Datagram, error) {
|
||||
// Check if closed first, but don't rely on this check for safety
|
||||
r.mu.RLock()
|
||||
if r.closed {
|
||||
r.mu.RUnlock()
|
||||
@ -20,16 +19,12 @@ func (r *DatagramReader) ReceiveDatagram() (*Datagram, error) {
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
|
||||
// Use select with closeChan to handle concurrent close operations safely
|
||||
// The closeChan will be signaled when Close() is called, providing
|
||||
// a reliable way to detect closure even if it happens during this function
|
||||
select {
|
||||
case datagram := <-r.recvChan:
|
||||
return datagram, nil
|
||||
case err := <-r.errorChan:
|
||||
return nil, err
|
||||
case <-r.closeChan:
|
||||
// This case handles both initial closure check and concurrent closure
|
||||
return nil, oops.Errorf("reader is closed")
|
||||
}
|
||||
}
|
||||
@ -50,39 +45,53 @@ func (r *DatagramReader) Close() error {
|
||||
// Signal termination to receiveLoop
|
||||
close(r.closeChan)
|
||||
|
||||
// Wait for receiveLoop to signal it has exited by closing doneChan
|
||||
// This ensures proper synchronization without arbitrary delays
|
||||
// Wait for receiveLoop to signal completion with timeout protection
|
||||
select {
|
||||
case <-r.doneChan:
|
||||
// receiveLoop has confirmed it stopped
|
||||
logger.Debug("Receive loop terminated gracefully")
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout protection - log warning but continue cleanup
|
||||
logger.Warn("Timeout waiting for receive loop to stop")
|
||||
}
|
||||
|
||||
// Now safe to close the receiver channels since receiveLoop has stopped
|
||||
close(r.recvChan)
|
||||
close(r.errorChan)
|
||||
// Close receiver channels only after receiveLoop has stopped
|
||||
// Use non-blocking close to avoid deadlock if channels are already closed
|
||||
r.safeCloseChannel()
|
||||
|
||||
logger.Debug("Successfully closed DatagramReader")
|
||||
return nil
|
||||
}
|
||||
|
||||
// safeCloseChannel safely closes channels with panic recovery
|
||||
func (r *DatagramReader) safeCloseChannel() {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
// Channel was already closed - this is expected in some race conditions
|
||||
}
|
||||
}()
|
||||
|
||||
close(r.recvChan)
|
||||
close(r.errorChan)
|
||||
}
|
||||
|
||||
// receiveLoop continuously receives incoming datagrams
|
||||
func (r *DatagramReader) receiveLoop() {
|
||||
logger := log.WithField("session_id", r.session.ID())
|
||||
logger.Debug("Starting receive loop")
|
||||
|
||||
// Signal completion when this loop exits - doneChan must be initialized
|
||||
// before this goroutine starts to avoid race conditions with Close()
|
||||
// Ensure doneChan is properly signaled when loop exits
|
||||
defer func() {
|
||||
if r.doneChan != nil {
|
||||
close(r.doneChan)
|
||||
// Use non-blocking send to avoid deadlock if Close() isn't waiting
|
||||
select {
|
||||
case r.doneChan <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
logger.Debug("Receive loop goroutine terminated")
|
||||
}()
|
||||
|
||||
for {
|
||||
// Check for closure in a non-blocking way first
|
||||
// Check for closure signal before any blocking operations
|
||||
select {
|
||||
case <-r.closeChan:
|
||||
logger.Debug("Receive loop terminated - reader closed")
|
||||
@ -90,15 +99,16 @@ func (r *DatagramReader) receiveLoop() {
|
||||
default:
|
||||
}
|
||||
|
||||
// Now perform the blocking read operation
|
||||
// Perform the blocking read operation
|
||||
datagram, err := r.receiveDatagram()
|
||||
if err != nil {
|
||||
// Use atomic check and send pattern to avoid race
|
||||
// Use atomic send pattern with close check to prevent panic
|
||||
select {
|
||||
case r.errorChan <- err:
|
||||
logger.WithError(err).Error("Failed to receive datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during error handling
|
||||
// Reader was closed during error handling - exit gracefully
|
||||
logger.Debug("Receive loop terminated during error handling")
|
||||
return
|
||||
}
|
||||
continue
|
||||
@ -109,7 +119,8 @@ func (r *DatagramReader) receiveLoop() {
|
||||
case r.recvChan <- datagram:
|
||||
logger.Debug("Successfully received datagram")
|
||||
case <-r.closeChan:
|
||||
// Reader was closed during datagram send
|
||||
// Reader was closed during datagram send - exit gracefully
|
||||
logger.Debug("Receive loop terminated during datagram send")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ func (s *DatagramSession) NewReader() *DatagramReader {
|
||||
recvChan: make(chan *Datagram, 10), // Buffer for incoming datagrams
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}, 1),
|
||||
closed: false,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
|
Reference in New Issue
Block a user