Resolve TCP hanging but port is in use issue

This commit is contained in:
Owen Schwartz 2025-01-19 22:46:00 -05:00
parent 533886f2e4
commit 759780508a
No known key found for this signature in database
GPG key ID: 8271FDFFD9E0CCBD
4 changed files with 56 additions and 55 deletions

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/google/btree v1.1.2 // indirect github.com/google/btree v1.1.2 // indirect
github.com/gorilla/websocket v1.5.3 // indirect github.com/gorilla/websocket v1.5.3 // indirect
golang.org/x/crypto v0.28.0 // indirect golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 // indirect
golang.org/x/net v0.30.0 // indirect golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect golang.org/x/sys v0.26.0 // indirect
golang.org/x/time v0.7.0 // indirect golang.org/x/time v0.7.0 // indirect

2
go.sum
View file

@ -4,6 +4,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8 h1:yqrTHse8TCMW1M1ZCP+VAR/l0kKxwaAIqN/il7x4voA=
golang.org/x/exp v0.0.0-20250106191152-7588d65b2ba8/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU=
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=

View file

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/fosrl/newt/logger" "github.com/fosrl/newt/logger"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
"golang.zx2c4.com/wireguard/tun/netstack" "golang.zx2c4.com/wireguard/tun/netstack"
) )
@ -19,13 +20,12 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
} }
} }
func (pm *ProxyManager) AddTarget(protocol, listen string, port int, target string) { func (pm *ProxyManager) AddTarget(protocol, listen string, port int, target string) error {
pm.Lock() pm.Lock()
defer pm.Unlock() defer pm.Unlock()
logger.Info("Adding target: %s://%s:%d -> %s", protocol, listen, port, target) logger.Info("Adding target: %s://%s:%d -> %s", protocol, listen, port, target)
newTarget := &ProxyTarget{
newTarget := ProxyTarget{
Protocol: protocol, Protocol: protocol,
Listen: listen, Listen: listen,
Port: port, Port: port,
@ -35,6 +35,7 @@ func (pm *ProxyManager) AddTarget(protocol, listen string, port int, target stri
} }
pm.targets = append(pm.targets, newTarget) pm.targets = append(pm.targets, newTarget)
return nil
} }
func (pm *ProxyManager) RemoveTarget(protocol, listen string, port int) error { func (pm *ProxyManager) RemoveTarget(protocol, listen string, port int) error {
@ -54,31 +55,21 @@ func (pm *ProxyManager) RemoveTarget(protocol, listen string, port int) error {
// Signal the serving goroutine to stop // Signal the serving goroutine to stop
select { select {
case <-target.cancel: case <-target.cancel:
// Channel is already closed, no need to close it again // Channel is already closed
default: default:
close(target.cancel) close(target.cancel)
} }
// Close the appropriate listener/connection based on protocol // Close the listener/connection
target.Lock() target.Lock()
switch protocol { switch protocol {
case "tcp": case "tcp":
if target.listener != nil { if target.listener != nil {
select { target.listener.Close()
case <-target.cancel:
// Listener was already closed by Stop()
default:
target.listener.Close()
}
} }
case "udp": case "udp":
if target.udpConn != nil { if target.udpConn != nil {
select { target.udpConn.Close()
case <-target.cancel:
// Connection was already closed by Stop()
default:
target.udpConn.Close()
}
} }
} }
target.Unlock() target.Unlock()
@ -86,7 +77,6 @@ func (pm *ProxyManager) RemoveTarget(protocol, listen string, port int) error {
// Wait for the target to fully stop // Wait for the target to fully stop
<-target.done <-target.done
// Remove the target from the slice
pm.targets = append(pm.targets[:i], pm.targets[i+1:]...) pm.targets = append(pm.targets[:i], pm.targets[i+1:]...)
return nil return nil
} }
@ -99,9 +89,7 @@ func (pm *ProxyManager) Start() error {
pm.RLock() pm.RLock()
defer pm.RUnlock() defer pm.RUnlock()
for i := range pm.targets { for _, target := range pm.targets {
target := &pm.targets[i]
target.Lock() target.Lock()
// If target is already running, skip it // If target is already running, skip it
if target.listener != nil || target.udpConn != nil { if target.listener != nil || target.udpConn != nil {
@ -110,7 +98,6 @@ func (pm *ProxyManager) Start() error {
} }
// Mark the target as starting by creating a nil listener/connection // Mark the target as starting by creating a nil listener/connection
// This prevents other goroutines from trying to start it
if strings.ToLower(target.Protocol) == "tcp" { if strings.ToLower(target.Protocol) == "tcp" {
target.listener = nil target.listener = nil
} else { } else {
@ -135,10 +122,11 @@ func (pm *ProxyManager) Stop() error {
defer pm.Unlock() defer pm.Unlock()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := range pm.targets { for _, target := range pm.targets {
target := &pm.targets[i]
wg.Add(1) wg.Add(1)
go func(t *ProxyTarget) { // Create a new variable in the loop to avoid closure issues
t := target // Take a local copy
go func() {
defer wg.Done() defer wg.Done()
close(t.cancel) close(t.cancel)
t.Lock() t.Lock()
@ -151,7 +139,7 @@ func (pm *ProxyManager) Stop() error {
t.Unlock() t.Unlock()
// Wait for the target to fully stop // Wait for the target to fully stop
<-t.done <-t.done
}(target) }()
} }
wg.Wait() wg.Wait()
return nil return nil
@ -220,32 +208,41 @@ func (pm *ProxyManager) handleTCPConnection(clientConn net.Conn, target string,
} }
defer serverConn.Close() defer serverConn.Close()
var wg sync.WaitGroup // Create error channels for both copy operations
wg.Add(2) errc1 := make(chan error, 1)
errc2 := make(chan error, 1)
// Client -> Server // Copy from client to server
go func() { go func() {
defer wg.Done() _, err := io.Copy(serverConn, clientConn)
select { errc1 <- err
case <-done:
return
default:
io.Copy(serverConn, clientConn)
}
}() }()
// Server -> Client // Copy from server to client
go func() { go func() {
defer wg.Done() _, err := io.Copy(clientConn, serverConn)
select { errc2 <- err
case <-done:
return
default:
io.Copy(clientConn, serverConn)
}
}() }()
wg.Wait() // Wait for either copy to finish or done signal
select {
case <-done:
// Gracefully close connections without type assertions
if closer, ok := clientConn.(interface{ CloseRead() error }); ok {
closer.CloseRead()
}
if closer, ok := serverConn.(*gonet.TCPConn); ok {
closer.CloseRead()
}
case err := <-errc1:
if err != nil {
logger.Info("Error copying client->server: %v", err)
}
case err := <-errc2:
if err != nil {
logger.Info("Error copying server->client: %v", err)
}
}
} }
func (pm *ProxyManager) serveUDP(target *ProxyTarget) { func (pm *ProxyManager) serveUDP(target *ProxyTarget) {

View file

@ -9,19 +9,20 @@ import (
) )
type ProxyTarget struct { type ProxyTarget struct {
Protocol string Protocol string
Listen string Listen string
Port int Port int
Target string Target string
cancel chan struct{} // Channel to signal shutdown cancel chan struct{} // Channel to signal shutdown
done chan struct{} // Channel to signal completion done chan struct{} // Channel to signal completion
listener net.Listener // For TCP listener net.Listener // For TCP
udpConn net.PacketConn // For UDP udpConn net.PacketConn // For UDP
sync.Mutex // Protect access to connection sync.Mutex // Protect access to connection
activeConns sync.Map
} }
type ProxyManager struct { type ProxyManager struct {
targets []ProxyTarget targets []*ProxyTarget
tnet *netstack.Net tnet *netstack.Net
log *log.Logger log *log.Logger
sync.RWMutex // Protect access to targets slice sync.RWMutex // Protect access to targets slice