package main import ( "context" "io" "net" "net/http" "sync" "time" ) // ProxyStatus is one slot's entry in the /status snapshot. type ProxyStatus struct { ID int `json:"id"` State string `json:"state"` // "live" | "draining" | "warming" Inflight int `json:"inflight"` // Egress fields are populated by the background prober (omitted until the // first successful probe, or when probing is disabled). EgressIP string `json:"egress_ip,omitempty"` Reachable bool `json:"reachable"` LatencyMS int64 `json:"latency_ms,omitempty"` CheckedAgeSec *int64 `json:"checked_age_sec,omitempty"` } // RotationStatus describes when the next rotation is due and which egress it // will replace. All fields are inferred from observed drain timing, so they // are nil until enough rotations have happened to establish the cadence. type RotationStatus struct { NextSlot *int `json:"next_slot,omitempty"` NextEgressIP string `json:"next_egress_ip,omitempty"` ETASec *int64 `json:"eta_sec,omitempty"` IntervalSec *int64 `json:"interval_sec,omitempty"` LastSlot *int `json:"last_slot,omitempty"` LastRotatedAgeSec *int64 `json:"last_rotated_age_sec,omitempty"` } // StatusSnapshot is the JSON returned by /status — cheap to produce, safe to // poll every second. type StatusSnapshot struct { Now string `json:"now"` ProxiesTotal int `json:"proxies_total"` Available int `json:"available"` Proxies []ProxyStatus `json:"proxies"` Rotation RotationStatus `json:"rotation"` } // snapshot builds the current status under the lock. No network I/O happens // here — egress data is whatever the prober last cached. func (m *proxyManager) snapshot() StatusSnapshot { m.mu.Lock() defer m.mu.Unlock() now := time.Now() snap := StatusSnapshot{ Now: now.UTC().Format(time.RFC3339), ProxiesTotal: m.n, Proxies: make([]ProxyStatus, m.n), } for i := 0; i < m.n; i++ { state := "live" switch { case m.draining[i]: state = "draining" case !m.warmupAt[i].IsZero() && now.Before(m.warmupAt[i]): state = "warming" default: snap.Available++ } ps := ProxyStatus{ID: i, State: state, Inflight: m.inflight[i]} if e := m.egress[i]; !e.checkedAt.IsZero() { ps.EgressIP = e.ip ps.Reachable = e.ok ps.LatencyMS = e.latency.Milliseconds() age := int64(now.Sub(e.checkedAt).Seconds()) ps.CheckedAgeSec = &age } snap.Proxies[i] = ps } if !m.lastRotated.IsZero() { last := m.lastSlot snap.Rotation.LastSlot = &last lra := int64(now.Sub(m.lastRotated).Seconds()) snap.Rotation.LastRotatedAgeSec = &lra if m.interval > 0 { next := (m.lastSlot + 1) % m.n isec := int64(m.interval.Seconds()) eta := int64(m.lastRotated.Add(m.interval).Sub(now).Seconds()) if eta < 0 { eta = 0 } snap.Rotation.NextSlot = &next snap.Rotation.NextEgressIP = m.egress[next].ip snap.Rotation.IntervalSec = &isec snap.Rotation.ETASec = &eta } } return snap } // prober periodically fetches each live slot's public egress IP through its own // SOCKS transport and caches it on the manager for /status to report. type prober struct { mgr *proxyManager pool []*http.Transport url string timeout time.Duration } // probe fetches the egress IP for a single slot. A non-2xx response or a body // that isn't a valid IP is recorded as unreachable. func (p *prober) probe(id int) { client := &http.Client{Transport: p.pool[id], Timeout: p.timeout} req, err := http.NewRequest(http.MethodGet, p.url, nil) if err != nil { p.mgr.setEgress(id, "", 0, false) return } start := time.Now() resp, err := client.Do(req) if err != nil { p.mgr.setEgress(id, "", 0, false) return } defer resp.Body.Close() body, _ := io.ReadAll(io.LimitReader(resp.Body, 64)) ip := string(body) for len(ip) > 0 && (ip[len(ip)-1] == '\n' || ip[len(ip)-1] == '\r' || ip[len(ip)-1] == ' ') { ip = ip[:len(ip)-1] } ok := resp.StatusCode == http.StatusOK && net.ParseIP(ip) != nil if !ok { ip = "" } p.mgr.setEgress(id, ip, time.Since(start), ok) } // probeAll probes every currently selectable slot concurrently (draining and // warming slots are skipped — their egress is mid-change). func (p *prober) probeAll() { var wg sync.WaitGroup for i := 0; i < p.mgr.n; i++ { p.mgr.mu.Lock() sel := p.mgr.selectableLocked(i) p.mgr.mu.Unlock() if !sel { continue } wg.Add(1) go func(id int) { defer wg.Done(); p.probe(id) }(i) } wg.Wait() } func (p *prober) run(ctx context.Context, interval time.Duration) { p.probeAll() t := time.NewTicker(interval) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: p.probeAll() } } }