diff --git a/example/main.go b/example/main.go index 8b8049c..7f9616e 100644 --- a/example/main.go +++ b/example/main.go @@ -27,7 +27,7 @@ func main() { if err := metaListener.AddListener("tcp", tcpListener); err != nil { log.Fatalf("Failed to add TCP listener: %v", err) } - log.Println("Added TCP listener on 127.0.0.1:8080") + log.Println("Added TCP listener on 127.0.0.1:8082") // Create and add a Unix socket listener (on Unix systems) socketPath := "/tmp/example.sock" @@ -42,6 +42,7 @@ func main() { log.Println("Added Unix socket listener on", socketPath) } } + log.Println("Starting http server...") // Create a simple HTTP server using the meta listener server := &http.Server{ @@ -49,6 +50,7 @@ func main() { fmt.Fprintf(w, "Hello from MetaListener! You connected via: %s\n", r.Proto) }), } + log.Println("Server is ready to accept connections...") // Handle server shutdown gracefully stop := make(chan os.Signal, 1) diff --git a/metalistener.go b/metalistener.go index 09c6d5e..bfdef13 100644 --- a/metalistener.go +++ b/metalistener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "log" "net" "sync" "time" @@ -14,8 +15,6 @@ var ( ErrListenerClosed = errors.New("listener is closed") // ErrNoListeners is returned when the meta listener has no active listeners ErrNoListeners = errors.New("no active listeners") - // ErrInternalListenerFailure is returned when an internal listener fails - ErrInternalListenerFailure = errors.New("Internal listener error, shutting down metalistener for restart") ) // MetaListener implements the net.Listener interface and manages multiple @@ -27,8 +26,6 @@ type MetaListener struct { listenerWg sync.WaitGroup // connCh is used to receive connections from all managed listeners connCh chan ConnResult - // errCh is used to receive errors from all managed listeners - errCh chan error // closeCh signals all goroutines to stop closeCh chan struct{} // isClosed indicates whether the meta listener has been closed @@ -47,8 +44,7 @@ type ConnResult struct { func NewMetaListener() *MetaListener { return &MetaListener{ listeners: make(map[string]net.Listener), - connCh: make(chan ConnResult), - errCh: make(chan error, 1), // Buffered to prevent blocking + connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume closeCh: make(chan struct{}), } } @@ -102,58 +98,61 @@ func (ml *MetaListener) RemoveListener(id string) error { // handleListener runs in a separate goroutine for each added listener // and forwards accepted connections to the connCh channel. func (ml *MetaListener) handleListener(id string, listener net.Listener) { - defer ml.listenerWg.Done() + defer func() { + log.Printf("Listener goroutine for %s exiting", id) + ml.listenerWg.Done() + }() for { - conn, err := listener.Accept() - + // First check if the MetaListener is closed select { case <-ml.closeCh: - // Meta listener is being closed, exit + log.Printf("MetaListener closed, stopping %s listener", id) return default: - // Continue processing } + // Set a deadline for Accept to prevent blocking indefinitely + if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok { + deadline.SetDeadline(time.Now().Add(1 * time.Second)) + } + + conn, err := listener.Accept() if err != nil { - // Check if this is a temporary error + // Check if this is a timeout error (which we expect due to our deadline) + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + continue + } + + // Check if this is any other temporary error if netErr, ok := err.(net.Error); ok && netErr.Temporary() { - // For temporary errors, wait a bit and try again + log.Printf("Temporary error in %s listener: %v, retrying in 100ms", id, err) time.Sleep(100 * time.Millisecond) continue } - // For non-temporary errors, check if listener was closed - ml.mu.RLock() - _, stillExists := ml.listeners[id] - ml.mu.RUnlock() - - if stillExists { - // Create a combined error with both the standard message and original error details - combinedErr := fmt.Errorf("%w: listener %s error - %v", - ErrInternalListenerFailure, id, err) - - // Send the combined error to notify Accept() calls - select { - case ml.errCh <- combinedErr: - default: - // Don't block if no one is reading errors - } - - // Then close all listeners - go ml.Close() - } + log.Printf("Permanent error in %s listener: %v, stopping", id, err) + ml.mu.Lock() + delete(ml.listeners, id) + ml.mu.Unlock() return } - // Send the accepted connection to the connection channel + // If we reach here, we have a valid connection + log.Printf("Listener %s accepted connection from %s", id, conn.RemoteAddr()) + + // Try to forward the connection, but don't block indefinitely select { case ml.connCh <- ConnResult{Conn: conn, src: id}: - // Connection forwarded successfully + log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id) case <-ml.closeCh: - // If we're closing and got a connection, close it + log.Printf("MetaListener closing while forwarding connection, closing connection") conn.Close() return + case <-time.After(5 * time.Second): + // If we can't forward within 5 seconds, something is seriously wrong + log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr()) + conn.Close() } } } @@ -161,26 +160,31 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) { // Accept implements the net.Listener Accept method. // It waits for and returns the next connection from any of the managed listeners. func (ml *MetaListener) Accept() (net.Conn, error) { - ml.mu.RLock() - if ml.isClosed { - ml.mu.RUnlock() - return nil, ErrListenerClosed - } + for { + ml.mu.RLock() + if ml.isClosed { + ml.mu.RUnlock() + return nil, ErrListenerClosed + } - if len(ml.listeners) == 0 { + if len(ml.listeners) == 0 { + ml.mu.RUnlock() + return nil, ErrNoListeners + } ml.mu.RUnlock() - return nil, ErrNoListeners - } - ml.mu.RUnlock() - // Wait for either a connection, an error, or close signal - select { - case result := <-ml.connCh: - return result.Conn, nil - case err := <-ml.errCh: - return nil, err - case <-ml.closeCh: - return nil, ErrListenerClosed + // Wait for either a connection or close signal + select { + case result, ok := <-ml.connCh: + if !ok { + return nil, ErrListenerClosed + } + log.Printf("Accept returning connection from %s via %s", + result.RemoteAddr(), result.src) + return result.Conn, nil + case <-ml.closeCh: + return nil, ErrListenerClosed + } } } @@ -194,6 +198,7 @@ func (ml *MetaListener) Close() error { return nil } + log.Printf("Closing MetaListener with %d listeners", len(ml.listeners)) ml.isClosed = true // Signal all goroutines to stop @@ -203,7 +208,8 @@ func (ml *MetaListener) Close() error { var errs []error for id, listener := range ml.listeners { if err := listener.Close(); err != nil { - errs = append(errs, fmt.Errorf("failed to close listener %s: %w", id, err)) + log.Printf("Error closing %s listener: %v", id, err) + errs = append(errs, err) } } @@ -211,6 +217,7 @@ func (ml *MetaListener) Close() error { // Wait for all listener goroutines to exit ml.listenerWg.Wait() + log.Printf("All listener goroutines have exited") // Return combined errors if any if len(errs) > 0 { diff --git a/mirror/header.go b/mirror/header.go index aa75a83..88d3828 100644 --- a/mirror/header.go +++ b/mirror/header.go @@ -85,7 +85,7 @@ func (ml *Mirror) Accept() (net.Conn, error) { return nil, err } // If the handshake is successful, get the underlying connection - conn = tlsConn.NetConn() + //conn = tlsConn.NetConn() } host := map[string]string{