package main import ( "errors" "math/rand/v2" "sync" "time" ) // errDraining is returned when a request pins a slot that is currently being // rotated out. errNoProxies is returned when no slot is selectable at all. var ( errDraining = errors.New("proxy is draining for rotation") errNoProxies = errors.New("no proxy currently available") ) // proxyManager tracks per-slot availability so the rotation tooling can take a // slot out of service a moment before its WireGuard tunnel is torn down and // bring it back a moment after the new tunnel is up — without ever handing an // in-flight request to a slot that is mid-rotation. // // Lifecycle of a rotation (driven by rotate.sh via the admin server): // // drain(id) -> stop routing NEW requests to id, block until in-flight // drains to zero (bounded by timeout) + a short settle. // (restart the tunnel unit) // undrain(id) -> mark id available again, but keep it out of the candidate // set for `warmup` so the fresh handshake can settle first. // // A drained slot self-heals: if undrain never arrives (e.g. rotate.sh died // mid-run) the slot returns to service after maxHold so the pool can't shrink // permanently. type proxyManager struct { mu sync.Mutex n int inflight []int draining []bool warmupAt []time.Time // slot not selectable until now >= warmupAt[id] healTmr []*time.Timer egress []egressInfo // last observed egress IP per slot (from the prober) // Rotation cadence, inferred from drain() calls. rotate.sh rotates exactly // one slot per timer tick in round-robin order, so the gap between drains is // the rotation interval and the next slot to rotate is (lastSlot+1) % n. lastSlot int lastRotated time.Time interval time.Duration // afterUndrain, if set, is called (outside the lock) when a slot returns to // service — used by the prober to refresh that slot's egress IP promptly. afterUndrain func(id int) settle time.Duration // quiet period after in-flight hits zero (the "before") warmup time.Duration // post-undrain cool-in before re-selection (the "after") maxHold time.Duration // safety: auto-undrain if rotate.sh never calls undrain } // egressInfo is the prober's last view of a slot's public egress. type egressInfo struct { ip string latency time.Duration ok bool checkedAt time.Time } func newProxyManager(n int, settle, warmup, maxHold time.Duration) *proxyManager { return &proxyManager{ n: n, inflight: make([]int, n), draining: make([]bool, n), warmupAt: make([]time.Time, n), healTmr: make([]*time.Timer, n), egress: make([]egressInfo, n), lastSlot: -1, settle: settle, warmup: warmup, maxHold: maxHold, } } // setEgress records the prober's latest observation for a slot. func (m *proxyManager) setEgress(id int, ip string, latency time.Duration, ok bool) { m.mu.Lock() m.egress[id] = egressInfo{ip: ip, latency: latency, ok: ok, checkedAt: time.Now()} m.mu.Unlock() } // selectableLocked reports whether slot id may take a new request right now. // Caller must hold m.mu. func (m *proxyManager) selectableLocked(id int) bool { if m.draining[id] { return false } if w := m.warmupAt[id]; !w.IsZero() && time.Now().Before(w) { return false } return true } // acquirePinned reserves a specific slot for one request. It combines the // availability check and the in-flight increment under the lock so a drain // can't slip in between. Call release(id) when the request finishes. func (m *proxyManager) acquirePinned(id int) error { m.mu.Lock() defer m.mu.Unlock() if !m.selectableLocked(id) { return errDraining } m.inflight[id]++ return nil } // acquireAny picks a random selectable slot and reserves it. Returns errNoProxies // if every slot is draining/warming. Call release(id) when the request finishes. func (m *proxyManager) acquireAny() (int, error) { m.mu.Lock() defer m.mu.Unlock() cands := make([]int, 0, m.n) for i := 0; i < m.n; i++ { if m.selectableLocked(i) { cands = append(cands, i) } } if len(cands) == 0 { return 0, errNoProxies } id := cands[rand.IntN(len(cands))] m.inflight[id]++ return id, nil } // release marks one in-flight request on id as finished. func (m *proxyManager) release(id int) { m.mu.Lock() if m.inflight[id] > 0 { m.inflight[id]-- } m.mu.Unlock() } // drain stops new traffic to id and waits until in-flight reaches zero (bounded // by timeout) followed by a settle period. Returns the number of requests still // in flight when it gave up (0 on a clean drain). A self-heal timer is armed so // the slot recovers even if undrain is never called. func (m *proxyManager) drain(id int, timeout time.Duration) int { m.mu.Lock() m.draining[id] = true // Infer rotation cadence: the gap since the previous drain is the interval. now := time.Now() if !m.lastRotated.IsZero() { if d := now.Sub(m.lastRotated); d > 0 { m.interval = d } } m.lastSlot = id m.lastRotated = now if m.healTmr[id] != nil { m.healTmr[id].Stop() } m.healTmr[id] = time.AfterFunc(m.maxHold, func() { m.undrain(id) }) m.mu.Unlock() deadline := time.Now().Add(timeout) for { m.mu.Lock() inf := m.inflight[id] m.mu.Unlock() if inf == 0 { time.Sleep(m.settle) // let the last connection's FIN flush return 0 } if time.Now().After(deadline) { return inf } time.Sleep(50 * time.Millisecond) } } // undrain returns id to service after a warmup window during which it stays out // of the candidate set (so the just-rebuilt tunnel's handshake can complete). func (m *proxyManager) undrain(id int) { m.mu.Lock() if m.healTmr[id] != nil { m.healTmr[id].Stop() m.healTmr[id] = nil } m.draining[id] = false m.warmupAt[id] = time.Now().Add(m.warmup) hook := m.afterUndrain m.mu.Unlock() if hook != nil { hook(id) } }