diff --git a/.gitignore b/.gitignore index 044caea..1eb1149 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ wg-configs/*.conf *.env service/weircon-random-proxy +service/random-proxy diff --git a/README.md b/README.md index 18a3897..fab777f 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,39 @@ Use it whenever you need an HTTP-facing API that returns results from rotating e Tunnel count is configurable. Default deploy uses 10 tunnels; the WireGuard configs themselves can come from any provider that gives you a standard `[Interface] / [Peer]` config (ProtonVPN, Mullvad, AzireVPN, self-hosted, …). Drop the configs into `/etc/weircon-random-proxy/wg/proxy.conf` and the rest is identical. +### Rotating over more configs than tunnels + +A slot (netns `proxy`, fixed IP, fixed SOCKS port) is decoupled from the WireGuard config it carries. `netns-up.sh` just reads `wg/proxy.conf`, and that path can be a **symlink** into a larger pool. So if your provider caps concurrent connections (ProtonVPN allows 10) but you hold 50–100 configs, keep the 10 always-on slots and rotate *which config each slot points at*: + +``` +/etc/weircon-random-proxy/ + wg-pool/ ← all your configs (proton-001.conf … proton-100.conf) + wg/ + proxy0.conf -> ../wg-pool/proton-047.conf ← symlink, repointed over time + … + proxy9.conf -> ../wg-pool/proton-088.conf +``` + +`weircon-rotate.timer` fires `rotate.sh`, which advances **one slot at a time** to the next unused config in the pool and restarts only that slot. The other 9 stay up, so there is always egress; over a full cycle every config gets used. Because `weircon-proxy@.service` tears the old tunnel down before raising the new one, a slot is never doubly-connected — you never exceed the provider's concurrent limit, even mid-rotation. + +**Graceful drain.** So a request is never handed to a tunnel that's about to disappear, `rotate.sh` coordinates with the service over a localhost-only admin API before touching a slot: + +1. `POST /drain?id=N` — the service stops routing **new** requests to slot N and blocks until that slot's in-flight requests finish (bounded by `WEIRCON_DRAIN_TIMEOUT_SEC`), plus a short `WEIRCON_DRAIN_SETTLE_SEC` quiet period. +2. The symlink is repointed and the tunnel unit restarted. +3. `POST /undrain?id=N` — slot N is returned to service, but held out of the candidate set for `WEIRCON_DRAIN_WARMUP_SEC` so the fresh WireGuard handshake can settle before traffic resumes. + +Random picks skip draining/warming slots automatically; a request that *pins* a slot mid-rotation gets `503` + `Retry-After`. If `rotate.sh` ever dies between drain and undrain, the slot self-heals after `WEIRCON_DRAIN_MAX_HOLD_SEC` so the pool can't shrink permanently. The admin server binds `127.0.0.1:8081` by default — keep it off your reverse proxy. `GET /status` returns a per-slot snapshot (in-flight / draining / warming). + +```sh +cp proton-*.conf /etc/weircon-random-proxy/wg-pool/ # the pool (not in git) +weircon-rotate init # seed 10 distinct symlinks +systemctl restart weircon-proxy@{0..9}.service +systemctl enable --now weircon-rotate.timer # auto-rotate (default every 5 min) +systemctl edit weircon-rotate.timer # change interval (OnUnitActiveSec) +``` + +The fetch service is unaware of any of this — it still sees 10 fixed SOCKS endpoints. The only observable change is that the egress IP behind a given `X-WEIRCON-PROXY-ID` drifts over time. (Trade-off: restarting a slot drops requests in flight on that one slot for the ~1–2 s WireGuard handshake; one-at-a-time rotation keeps the blast radius to a single slot.) + ## Client API Fetch a URL through a random tunnel: @@ -97,6 +130,37 @@ curl -H "X-WEIRCON-RANDOM-IP: $API_KEY" \ Status, headers, and body of the upstream response are returned directly. The service adds `X-WEIRCON-EGRESS-PROXY: ` so the caller can see which tunnel was used. +### Live status (`/status`) + +`GET /status` returns a poll-friendly JSON snapshot for scraper-side dashboards — cheap to produce (no network I/O in the request path; egress IPs come from a background prober) and safe to hit every 1–5 s. It's behind the same API-key gate as `/`. + +```bash +curl -H "X-WEIRCON-RANDOM-IP: $API_KEY" https://random-proxy.example.com/status +``` + +```jsonc +{ + "now": "2026-06-01T08:47:18Z", + "proxies_total": 10, + "available": 9, // slots currently selectable (not draining/warming) + "proxies": [ + { "id": 0, "state": "live", "inflight": 2, "egress_ip": "185.x.x.10", "reachable": true, "latency_ms": 42, "checked_age_sec": 7 }, + { "id": 1, "state": "draining", "inflight": 0, "egress_ip": "185.x.x.11", "reachable": true, "latency_ms": 51, "checked_age_sec": 9 }, + // … + ], + "rotation": { + "next_slot": 2, // which slot rotates next (round-robin) + "next_egress_ip": "185.x.x.12", // the egress IP about to be replaced + "eta_sec": 137, // seconds until the next rotation + "interval_sec": 300, + "last_slot": 1, // slot that rotated most recently + "last_rotated_age_sec": 163 + } +} +``` + +Per-slot `state` is `live` (selectable), `draining` (in-flight finishing, about to be torn down), or `warming` (new tunnel up, settling before traffic resumes) — drive your visual "rotating now" indicator off the latter two. The `rotation` block tells you *when* the next rotation lands and *which* egress IP it will replace, so you can show a countdown and flag the outgoing IP ahead of time. The `rotation` fields stay absent until enough rotations have happened to infer the cadence (≈ two timer ticks after boot); egress fields are absent when probing is disabled. + ## Built-in API tester (`/ui`) The fetch service ships with an embedded HTML page at `/ui` that documents the API and lets you exercise every endpoint interactively: @@ -119,10 +183,14 @@ The UI is enabled by default. Set `WEIRCON_UI_ENABLED=false` in `fetch.env` to d | `lxc/setup-container.sh` | Runs **inside** the LXC: apt deps, microsocks, helper scripts, systemd units. | | `lxc/netns-up.sh` | Brings one tunnel namespace up (`proxy` with wg0 + veth + bridge). | | `lxc/netns-down.sh` | Tears it back down (invoked by systemd ExecStopPost). | +| `lxc/rotate.sh` | Rotates one slot to the next pool config; `init` seeds the slots. | | `lxc/systemd/weircon-proxies.target`| Grouping target for all tunnels. | | `lxc/systemd/weircon-proxy@.service`| Templated unit. Enable with `weircon-proxy@{0..N-1}.service`. | +| `lxc/systemd/weircon-rotate.service`| Oneshot that runs `rotate.sh` (one slot per invocation). | +| `lxc/systemd/weircon-rotate.timer` | Fires the rotation on an interval (default 5 min; `systemctl edit` to change).| | `lxc/systemd/weircon-fetch.service` | The fetch service itself. Hardened (DynamicUser, no capabilities). | | `lxc/fetch.env.example` | Default env. Copy to `/etc/weircon-random-proxy/fetch.env`. | +| `lxc/rotate.env.example` | Rotation env (`SLOTS`). Copy to `/etc/weircon-random-proxy/rotate.env`. | | `npm/advanced.conf` | Reverse-proxy snippet: header check, strip, forward. | | `wg-configs/` | Local placeholder. Actual `.conf` files live on the LXC, **not** in git. | @@ -137,6 +205,14 @@ The UI is enabled by default. Set `WEIRCON_UI_ENABLED=false` in `fetch.env` to d | `WEIRCON_PROXY_COUNT` | `10` | Number of fallback endpoints. | | `WEIRCON_REQUEST_TIMEOUT_SEC` | `30` | Per-upstream-fetch ceiling. | | `WEIRCON_UI_ENABLED` | `true` | Toggle the embedded `/ui` page. | +| `WEIRCON_ADMIN_LISTEN` | `127.0.0.1:8081` | Localhost drain/undrain API for rotation. Empty disables it. | +| `WEIRCON_DRAIN_TIMEOUT_SEC` | `25` | Max wait for a slot's in-flight requests to clear during drain. | +| `WEIRCON_DRAIN_SETTLE_SEC` | `2` | Quiet period after in-flight hits zero (the "couple secs before").| +| `WEIRCON_DRAIN_WARMUP_SEC` | `3` | Slot held out of rotation after undrain (the "couple secs after").| +| `WEIRCON_DRAIN_MAX_HOLD_SEC` | `60` | Auto-undrain safety if `rotate.sh` dies between drain and undrain.| +| `WEIRCON_PROBE_URL` | `https://api.ipify.org` | IP-echo URL the prober fetches through each tunnel for `/status`. | +| `WEIRCON_PROBE_INTERVAL_SEC` | `60` | Egress-IP probe cadence. `0` disables probing (`egress_ip` empty).| +| `WEIRCON_PROBE_TIMEOUT_SEC` | `10` | Per-probe ceiling. | If `WEIRCON_PROXY_ADDRS` is set it wins; otherwise the service computes `WEIRCON_PROXY_HOST:WEIRCON_PROXY_BASE_PORT + i` for `i` in `[0, WEIRCON_PROXY_COUNT)`. diff --git a/lxc/fetch.env.example b/lxc/fetch.env.example index 3b1db60..c5cf94d 100644 --- a/lxc/fetch.env.example +++ b/lxc/fetch.env.example @@ -7,3 +7,28 @@ WEIRCON_PROXY_ADDRS=10.99.0.10:1080,10.99.0.11:1080,10.99.0.12:1080,10.99.0.13:1 # Maks tid pr. upstream-fetch (sekunder). WEIRCON_REQUEST_TIMEOUT_SEC=30 + +# Admin-endpoint til rotation (drain/undrain). Kun localhost — må ALDRIG +# eksponeres via reverse-proxy. Tom streng slår admin-serveren helt fra. +WEIRCON_ADMIN_LISTEN=127.0.0.1:8081 + +# Rotation-drænings-tuning (sekunder): +# TIMEOUT = maks ventetid på at in-flight requests på en slot tømmes +# SETTLE = ekstra ro-periode efter in-flight = 0 ("et par sek. før") +# WARMUP = slot holdes ude af rotation efter undrain, så WG-handshaket +# kan sætte sig ("et par sek. efter") +# MAX_HOLD= sikkerhed: slot self-healer og kommer tilbage selv hvis +# rotate.sh dør mellem drain og undrain +WEIRCON_DRAIN_TIMEOUT_SEC=25 +WEIRCON_DRAIN_SETTLE_SEC=2 +WEIRCON_DRAIN_WARMUP_SEC=3 +WEIRCON_DRAIN_MAX_HOLD_SEC=60 + +# Egress-IP prober: henter den aktuelle udgangs-IP gennem hver tunnel og cacher +# den, så /status kan vise hvilken IP der sidder på hver slot (og hvilken der +# snart roteres ud). INTERVAL=0 slår probing helt fra (så er egress_ip tom i +# /status, og klienten må selv probe). URL skal returnere klientens IP som ren +# tekst (ipify, ifconfig.me/ip, egen endpoint, …). +WEIRCON_PROBE_URL=https://api.ipify.org +WEIRCON_PROBE_INTERVAL_SEC=60 +WEIRCON_PROBE_TIMEOUT_SEC=10 diff --git a/lxc/rotate.env.example b/lxc/rotate.env.example new file mode 100644 index 0000000..5c859ab --- /dev/null +++ b/lxc/rotate.env.example @@ -0,0 +1,6 @@ +# Kopiér til /etc/weircon-random-proxy/rotate.env (valgfrit). +# Læses af weircon-rotate.service. + +# Antal aktive tunnel-slots. SKAL matche det weircon-proxy@N-interval du har +# enablet (fx weircon-proxy@{0..9} => SLOTS=10). Default hvis usat: 10. +SLOTS=10 diff --git a/lxc/rotate.sh b/lxc/rotate.sh new file mode 100755 index 0000000..5b3a804 --- /dev/null +++ b/lxc/rotate.sh @@ -0,0 +1,133 @@ +#!/bin/bash +# Roterer ÉN tunnel-slot ad gangen til den næste WG-config i puljen. +# +# - Slots (netns proxy0..proxy) er faste: faste IP'er, faste +# SOCKS-porte. Fetch-servicen kender kun de faste endpoints. +# - Configs ligger i en pulje (wg-pool/) der kan være større end SLOTS. +# - Hver slot's aktive config er et symlink: +# wg/proxy.conf -> wg-pool/.conf +# - Hvert kald her flytter præcis én slot videre til næste ledige config +# i puljen og genstarter kun den slot. De øvrige 9 bliver oppe, så der +# altid er egress ud — og over tid cykler alle configs igennem. +# +# Hvorfor én ad gangen: holder os trygt inden for udbyderens samtidigheds- +# grænse (fx Proton: max 10 samtidige WG-sessioner) og dropper aldrig al +# egress på én gang. weircon-proxy@.service river desuden den gamle tunnel +# ned (ExecStopPost) FØR den nye rejses (ExecStartPre), så en slot er aldrig +# dobbelt-forbundet midt i en rotation. +# +# Brug: +# rotate.sh # roter én slot (cursor i state-filen rykker) +# rotate.sh init # seed alle slots med de første SLOTS distinkte configs +set -euo pipefail + +POOL="/etc/weircon-random-proxy/wg-pool" +ACTIVE="/etc/weircon-random-proxy/wg" +STATE="/var/lib/weircon-random-proxy/rotate.state" +SLOTS="${SLOTS:-10}" +# Fetch-servicens admin-endpoint (lokal, ikke eksponeret via reverse-proxy). +# Tom => spring drain/undrain over (falder tilbage til ren restart). +ADMIN_URL="${WEIRCON_ADMIN_URL:-http://127.0.0.1:8081}" + +# Pulje-configs, sorteret og stabilt ordnet (kun basenavne). +mapfile -t POOL_CONFIGS < <(find "$POOL" -maxdepth 1 -type f -name '*.conf' -printf '%f\n' 2>/dev/null | sort) +N="${#POOL_CONFIGS[@]}" +if (( N == 0 )); then + echo "ingen configs i $POOL — læg proton-*.conf filer der" >&2 + exit 1 +fi +if (( N < SLOTS )); then + echo "WARN: kun $N configs i puljen til $SLOTS slots — der vil være gengangere" >&2 +fi + +# Hvilke configs er allerede i brug af en ANDEN slot end $skip_slot? +configs_in_use() { + local skip="$1" i link + for (( i = 0; i < SLOTS; i++ )); do + (( i == skip )) && continue + link="$ACTIVE/proxy${i}.conf" + [[ -L "$link" ]] && basename "$(readlink -f "$link")" + done +} + +# Find næste config i puljen fra position $1 som ikke er i $2..(resten). +# Sætter globalt $CHOSEN og $NEXT_POS. +pick_free_config() { + local start="$1" skip_slot="$2" k cand + local -a busy + mapfile -t busy < <(configs_in_use "$skip_slot") + for (( k = 0; k < N; k++ )); do + cand="${POOL_CONFIGS[(start + k) % N]}" + if ! printf '%s\n' "${busy[@]}" | grep -qxF "$cand"; then + CHOSEN="$cand" + NEXT_POS=$(( (start + k + 1) % N )) + return 0 + fi + done + return 1 +} + +# Bed fetch-servicen om at stoppe ny trafik til en slot og vente til +# in-flight er drænet (servicen håndterer selv ventetid + settle). Stille +# no-op hvis admin-endpointet ikke svarer (fx UI/service slået fra). +drain_slot() { + local slot="$1" + [[ -z "$ADMIN_URL" ]] && return 0 + curl -fsS -m 60 -X POST "${ADMIN_URL}/drain?id=${slot}" >/dev/null 2>&1 || \ + echo "WARN: drain af slot $slot fejlede (fortsætter)" >&2 +} + +# Giv slot tilbage til servicen. Servicen holder den selv ude af rotation et +# par sekunder mere (warmup) så den friske WG-handshake kan nå at sætte sig. +undrain_slot() { + local slot="$1" + [[ -z "$ADMIN_URL" ]] && return 0 + curl -fsS -m 10 -X POST "${ADMIN_URL}/undrain?id=${slot}" >/dev/null 2>&1 || \ + echo "WARN: undrain af slot $slot fejlede (slot self-healer)" >&2 +} + +assign_slot() { + local slot="$1" cfg="$2" + drain_slot "$slot" # stop trafik FØR teardown + ln -sfn "$POOL/$cfg" "$ACTIVE/proxy${slot}.conf" + systemctl restart "weircon-proxy@${slot}.service" # river gammel ned, rejser ny + undrain_slot "$slot" # genoptag trafik EFTER (m. warmup) +} + +mkdir -p "$(dirname "$STATE")" + +# --- init: seed alle slots distinkt ------------------------------------- +if [[ "${1:-}" == "init" ]]; then + pos=0 + for (( s = 0; s < SLOTS; s++ )); do + if ! pick_free_config "$pos" "$s"; then + echo "kunne ikke finde ledig config til slot $s" >&2 + exit 1 + fi + ln -sfn "$POOL/$CHOSEN" "$ACTIVE/proxy${s}.conf" + pos="$NEXT_POS" + echo "slot $s -> $CHOSEN" + done + printf 'slot=0\npos=%s\n' "$pos" > "$STATE" + echo "seedet $SLOTS slots; start/genstart tunnelerne for at aktivere" + exit 0 +fi + +# --- normal: roter præcis én slot --------------------------------------- +slot=0 +pos=0 +# shellcheck disable=SC1090 +[[ -f "$STATE" ]] && source "$STATE" +(( slot >= SLOTS )) && slot=0 + +if ! pick_free_config "$pos" "$slot"; then + echo "kunne ikke finde ledig config til slot $slot" >&2 + exit 1 +fi + +assign_slot "$slot" "$CHOSEN" +echo "roterede slot $slot -> $CHOSEN ($((slot + 1))/$SLOTS denne runde)" + +# Ryk cursors: næste slot, og pulje-position er allerede sat af pick_free_config. +slot=$(( (slot + 1) % SLOTS )) +printf 'slot=%s\npos=%s\n' "$slot" "$NEXT_POS" > "$STATE" diff --git a/lxc/setup-container.sh b/lxc/setup-container.sh index e26d6c7..a82abf1 100644 --- a/lxc/setup-container.sh +++ b/lxc/setup-container.sh @@ -54,18 +54,26 @@ chmod +x "$BIN" # 4. Helper-scripts install -Dm755 "$SCRIPT_DIR/netns-up.sh" /usr/local/sbin/weircon-netns-up install -Dm755 "$SCRIPT_DIR/netns-down.sh" /usr/local/sbin/weircon-netns-down +install -Dm755 "$SCRIPT_DIR/rotate.sh" /usr/local/sbin/weircon-rotate # 5. systemd units install -Dm644 "$SCRIPT_DIR/weircon-proxies.target" /etc/systemd/system/weircon-proxies.target install -Dm644 "$SCRIPT_DIR/weircon-proxy@.service" /etc/systemd/system/weircon-proxy@.service install -Dm644 "$SCRIPT_DIR/weircon-fetch.service" /etc/systemd/system/weircon-fetch.service +install -Dm644 "$SCRIPT_DIR/weircon-rotate.service" /etc/systemd/system/weircon-rotate.service +install -Dm644 "$SCRIPT_DIR/weircon-rotate.timer" /etc/systemd/system/weircon-rotate.timer # 6. Config-dir + default env -mkdir -p /etc/weircon-random-proxy/wg -chmod 700 /etc/weircon-random-proxy/wg +# wg/ = aktive slot-configs (proxy0.conf..proxyN.conf, evt. symlinks) +# wg-pool/ = valgfri pulje af ekstra configs som rotationen cykler igennem +mkdir -p /etc/weircon-random-proxy/wg /etc/weircon-random-proxy/wg-pool +chmod 700 /etc/weircon-random-proxy/wg /etc/weircon-random-proxy/wg-pool if [[ ! -f /etc/weircon-random-proxy/fetch.env ]]; then install -m640 "$SCRIPT_DIR/fetch.env.example" /etc/weircon-random-proxy/fetch.env fi +if [[ ! -f /etc/weircon-random-proxy/rotate.env ]]; then + install -m640 "$SCRIPT_DIR/rotate.env.example" /etc/weircon-random-proxy/rotate.env +fi systemctl daemon-reload @@ -89,6 +97,16 @@ Start stakken: systemctl enable --now weircon-proxy@{0..9}.service systemctl enable --now weircon-fetch.service +(Valgfrit) Roter over en STØRRE pulje af configs end de 10 slots: + # læg fx 50-100 configs i puljen + cp proton-*.conf /etc/weircon-random-proxy/wg-pool/ + # seed de 10 aktive slots som symlinks ind i puljen + weircon-rotate init + systemctl restart weircon-proxy@{0..9}.service + # roter automatisk én slot ad gangen (interval i weircon-rotate.timer) + systemctl enable --now weircon-rotate.timer + # juster intervallet: systemctl edit weircon-rotate.timer (default 5min) + Verificér: systemctl status 'weircon-proxy@*' curl http://127.0.0.1:8080/health diff --git a/lxc/systemd/weircon-rotate.service b/lxc/systemd/weircon-rotate.service new file mode 100644 index 0000000..21c33d1 --- /dev/null +++ b/lxc/systemd/weircon-rotate.service @@ -0,0 +1,9 @@ +[Unit] +Description=weircon-random-proxy: roter én tunnel-slot til næste WG-config i puljen +After=weircon-proxies.target +Wants=weircon-proxies.target + +[Service] +Type=oneshot +EnvironmentFile=-/etc/weircon-random-proxy/rotate.env +ExecStart=/usr/local/sbin/weircon-rotate diff --git a/lxc/systemd/weircon-rotate.timer b/lxc/systemd/weircon-rotate.timer new file mode 100644 index 0000000..7b30337 --- /dev/null +++ b/lxc/systemd/weircon-rotate.timer @@ -0,0 +1,21 @@ +[Unit] +Description=weircon-random-proxy: periodisk slot-rotation +PartOf=weircon-proxies.target + +[Timer] +# Standard: roter én slot hvert 5. minut. Med 10 slots betyder det at hele +# slot-sættet er cyklet igennem ca. hvert 50. minut, og hver enkelt config i +# en pulje på N filer er brugt mindst én gang efter ca. (N * 5) minutter. +# +# Skift interval uden at redigere denne fil: +# systemctl edit weircon-rotate.timer +# og indsæt fx: +# [Timer] +# OnUnitActiveSec= # (tom linje nulstiller arvet værdi) +# OnUnitActiveSec=15min +OnBootSec=5min +OnUnitActiveSec=5min +AccuracySec=15s + +[Install] +WantedBy=timers.target diff --git a/service/admin.go b/service/admin.go new file mode 100644 index 0000000..2c24ab8 --- /dev/null +++ b/service/admin.go @@ -0,0 +1,67 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" +) + +// newAdminHandler exposes the rotation control surface. It is intended to bind +// to localhost only (WEIRCON_ADMIN_LISTEN) and is called by rotate.sh: +// +// POST /drain?id=N take slot N out of service; blocks until in-flight +// drains (bounded by WEIRCON_DRAIN_TIMEOUT_SEC) + settle. +// 200 {"id":N,"drained":true,"inflight":0} on a clean drain, +// "drained":false with the residual count on timeout. +// POST /undrain?id=N return slot N to service after the warmup window. +// GET /status snapshot of every slot. +func newAdminHandler(cfg Config, mgr *proxyManager) http.Handler { + mux := http.NewServeMux() + + slotID := func(w http.ResponseWriter, r *http.Request) (int, bool) { + raw := r.URL.Query().Get("id") + id, err := strconv.Atoi(raw) + if err != nil || id < 0 || id >= mgr.n { + http.Error(w, fmt.Sprintf("id must be 0-%d", mgr.n-1), http.StatusBadRequest) + return 0, false + } + return id, true + } + writeJSON := func(w http.ResponseWriter, v any) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(v) + } + + mux.HandleFunc("/drain", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + id, ok := slotID(w, r) + if !ok { + return + } + residual := mgr.drain(id, cfg.DrainTimeout) + writeJSON(w, map[string]any{"id": id, "drained": residual == 0, "inflight": residual}) + }) + + mux.HandleFunc("/undrain", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "POST only", http.StatusMethodNotAllowed) + return + } + id, ok := slotID(w, r) + if !ok { + return + } + mgr.undrain(id) + writeJSON(w, map[string]any{"id": id, "warmup_sec": int(cfg.DrainWarmup.Seconds())}) + }) + + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, mgr.snapshot()) + }) + + return mux +} diff --git a/service/main.go b/service/main.go index 4cb15c0..2825122 100644 --- a/service/main.go +++ b/service/main.go @@ -7,12 +7,12 @@ package main import ( - _ "embed" "context" + _ "embed" + "encoding/json" "errors" "fmt" "log" - "math/rand/v2" "net" "net/http" "net/http/httputil" @@ -29,17 +29,37 @@ import ( var uiHTML []byte type Config struct { - Listen string - ProxyAddrs []string - Timeout time.Duration - UIEnabled bool + Listen string + AdminListen string + ProxyAddrs []string + Timeout time.Duration + UIEnabled bool + + // Rotation drain tuning. + DrainTimeout time.Duration // max wait for in-flight to clear during drain + DrainSettle time.Duration // quiet period after in-flight hits zero (the "before") + DrainWarmup time.Duration // post-undrain cool-in before re-selection (the "after") + DrainMaxHold time.Duration // safety: auto-undrain a slot if undrain never arrives + + // Egress-IP prober (feeds /status). + ProbeURL string // IP-echo endpoint fetched through each tunnel + ProbeInterval time.Duration // 0 disables probing + ProbeTimeout time.Duration // per-probe ceiling } func loadConfig() (Config, error) { cfg := Config{ - Listen: envStr("WEIRCON_LISTEN", ":8080"), - Timeout: time.Duration(envInt("WEIRCON_REQUEST_TIMEOUT_SEC", 30)) * time.Second, - UIEnabled: envBool("WEIRCON_UI_ENABLED", true), + Listen: envStr("WEIRCON_LISTEN", ":8080"), + AdminListen: envStr("WEIRCON_ADMIN_LISTEN", "127.0.0.1:8081"), + Timeout: time.Duration(envInt("WEIRCON_REQUEST_TIMEOUT_SEC", 30)) * time.Second, + UIEnabled: envBool("WEIRCON_UI_ENABLED", true), + DrainTimeout: time.Duration(envInt("WEIRCON_DRAIN_TIMEOUT_SEC", 25)) * time.Second, + DrainSettle: time.Duration(envInt("WEIRCON_DRAIN_SETTLE_SEC", 2)) * time.Second, + DrainWarmup: time.Duration(envInt("WEIRCON_DRAIN_WARMUP_SEC", 3)) * time.Second, + DrainMaxHold: time.Duration(envInt("WEIRCON_DRAIN_MAX_HOLD_SEC", 60)) * time.Second, + ProbeURL: envStr("WEIRCON_PROBE_URL", "https://api.ipify.org"), + ProbeInterval: time.Duration(envInt("WEIRCON_PROBE_INTERVAL_SEC", 60)) * time.Second, + ProbeTimeout: time.Duration(envInt("WEIRCON_PROBE_TIMEOUT_SEC", 10)) * time.Second, } if raw := envStr("WEIRCON_PROXY_ADDRS", ""); raw != "" { for _, a := range strings.Split(raw, ",") { @@ -192,10 +212,13 @@ func stripIdentifying(h http.Header) { } } -func pickProxyID(req *http.Request, n int) (int, error) { +// parsePin reads the optional X-WEIRCON-PROXY-ID header. It returns (-1, nil) +// when no pin is requested (caller should pick at random). The actual slot +// reservation — and the draining check — happens in the manager, not here. +func parsePin(req *http.Request, n int) (int, error) { raw := req.Header.Get("X-Weircon-Proxy-Id") if raw == "" { - return rand.IntN(n), nil + return -1, nil } id, err := strconv.Atoi(raw) if err != nil { @@ -235,7 +258,7 @@ func resolveMethod(req *http.Request) string { return req.Method } -func newHandler(cfg Config, pool []*http.Transport) http.Handler { +func newHandler(cfg Config, pool []*http.Transport, mgr *proxyManager) http.Handler { rp := &httputil.ReverseProxy{ Rewrite: func(pr *httputil.ProxyRequest) { rc, ok := pr.In.Context().Value(ctxKey{}).(reqCtx) @@ -271,6 +294,13 @@ func newHandler(cfg Config, pool []*http.Transport) http.Handler { w.Header().Set("Content-Type", "application/json") fmt.Fprintf(w, `{"ok":true,"proxies":%d}`, count) }) + // Poll-friendly status for scraper-side dashboards: per-slot egress IP + + // state, plus when the next rotation is due and which egress it replaces. + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.Header().Set("Cache-Control", "no-store") + _ = json.NewEncoder(w).Encode(mgr.snapshot()) + }) if cfg.UIEnabled { mux.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") @@ -296,11 +326,34 @@ func newHandler(cfg Config, pool []*http.Transport) http.Handler { http.Error(w, err.Error(), http.StatusBadRequest) return } - id, err := pickProxyID(r, count) + pin, err := parsePin(r, count) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + + // Reserve a slot. A pinned slot that is mid-rotation is rejected (the + // caller asked for that specific egress); a random pick just skips + // draining/warming slots. release() runs after the response finishes + // streaming so drain() can observe a true in-flight count. + var id int + if pin >= 0 { + id = pin + if err := mgr.acquirePinned(id); err != nil { + w.Header().Set("Retry-After", "5") + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + } else { + id, err = mgr.acquireAny() + if err != nil { + w.Header().Set("Retry-After", "5") + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } + } + defer mgr.release(id) + ctx := context.WithValue(r.Context(), ctxKey{}, reqCtx{ proxyID: id, target: target, @@ -320,9 +373,40 @@ func main() { if err != nil { log.Fatalf("transports: %v", err) } + mgr := newProxyManager(len(pool), cfg.DrainSettle, cfg.DrainWarmup, cfg.DrainMaxHold) + + // Background egress-IP prober feeds /status. A slot is re-probed shortly + // after it rotates back in so its new egress IP appears promptly rather + // than waiting for the next periodic pass. + if cfg.ProbeInterval > 0 { + pr := &prober{mgr: mgr, pool: pool, url: cfg.ProbeURL, timeout: cfg.ProbeTimeout} + mgr.afterUndrain = func(id int) { + time.AfterFunc(cfg.DrainWarmup+500*time.Millisecond, func() { pr.probe(id) }) + } + go pr.run(context.Background(), cfg.ProbeInterval) + log.Printf("egress prober: %s every %s", cfg.ProbeURL, cfg.ProbeInterval) + } + + // Admin server: localhost-only by default so drain/undrain is reachable by + // the rotation tooling running in the same LXC but never from the public + // reverse proxy. Empty WEIRCON_ADMIN_LISTEN disables it entirely. + if cfg.AdminListen != "" { + admin := &http.Server{ + Addr: cfg.AdminListen, + Handler: newAdminHandler(cfg, mgr), + ReadHeaderTimeout: 10 * time.Second, + } + go func() { + log.Printf("admin listening on %s", cfg.AdminListen) + if err := admin.ListenAndServe(); err != nil { + log.Fatalf("admin server: %v", err) + } + }() + } + srv := &http.Server{ Addr: cfg.Listen, - Handler: newHandler(cfg, pool), + Handler: newHandler(cfg, pool, mgr), ReadHeaderTimeout: 10 * time.Second, IdleTimeout: 120 * time.Second, } diff --git a/service/manager.go b/service/manager.go new file mode 100644 index 0000000..4f3dc39 --- /dev/null +++ b/service/manager.go @@ -0,0 +1,194 @@ +package main + +import ( + "errors" + "math/rand/v2" + "sync" + "time" +) + +// errDraining is returned when a request pins a slot that is currently being +// rotated out. errNoProxies is returned when no slot is selectable at all. +var ( + errDraining = errors.New("proxy is draining for rotation") + errNoProxies = errors.New("no proxy currently available") +) + +// proxyManager tracks per-slot availability so the rotation tooling can take a +// slot out of service a moment before its WireGuard tunnel is torn down and +// bring it back a moment after the new tunnel is up — without ever handing an +// in-flight request to a slot that is mid-rotation. +// +// Lifecycle of a rotation (driven by rotate.sh via the admin server): +// +// drain(id) -> stop routing NEW requests to id, block until in-flight +// drains to zero (bounded by timeout) + a short settle. +// (restart the tunnel unit) +// undrain(id) -> mark id available again, but keep it out of the candidate +// set for `warmup` so the fresh handshake can settle first. +// +// A drained slot self-heals: if undrain never arrives (e.g. rotate.sh died +// mid-run) the slot returns to service after maxHold so the pool can't shrink +// permanently. +type proxyManager struct { + mu sync.Mutex + n int + inflight []int + draining []bool + warmupAt []time.Time // slot not selectable until now >= warmupAt[id] + healTmr []*time.Timer + egress []egressInfo // last observed egress IP per slot (from the prober) + + // Rotation cadence, inferred from drain() calls. rotate.sh rotates exactly + // one slot per timer tick in round-robin order, so the gap between drains is + // the rotation interval and the next slot to rotate is (lastSlot+1) % n. + lastSlot int + lastRotated time.Time + interval time.Duration + + // afterUndrain, if set, is called (outside the lock) when a slot returns to + // service — used by the prober to refresh that slot's egress IP promptly. + afterUndrain func(id int) + + settle time.Duration // quiet period after in-flight hits zero (the "before") + warmup time.Duration // post-undrain cool-in before re-selection (the "after") + maxHold time.Duration // safety: auto-undrain if rotate.sh never calls undrain +} + +// egressInfo is the prober's last view of a slot's public egress. +type egressInfo struct { + ip string + latency time.Duration + ok bool + checkedAt time.Time +} + +func newProxyManager(n int, settle, warmup, maxHold time.Duration) *proxyManager { + return &proxyManager{ + n: n, + inflight: make([]int, n), + draining: make([]bool, n), + warmupAt: make([]time.Time, n), + healTmr: make([]*time.Timer, n), + egress: make([]egressInfo, n), + lastSlot: -1, + settle: settle, + warmup: warmup, + maxHold: maxHold, + } +} + +// setEgress records the prober's latest observation for a slot. +func (m *proxyManager) setEgress(id int, ip string, latency time.Duration, ok bool) { + m.mu.Lock() + m.egress[id] = egressInfo{ip: ip, latency: latency, ok: ok, checkedAt: time.Now()} + m.mu.Unlock() +} + +// selectableLocked reports whether slot id may take a new request right now. +// Caller must hold m.mu. +func (m *proxyManager) selectableLocked(id int) bool { + if m.draining[id] { + return false + } + if w := m.warmupAt[id]; !w.IsZero() && time.Now().Before(w) { + return false + } + return true +} + +// acquirePinned reserves a specific slot for one request. It combines the +// availability check and the in-flight increment under the lock so a drain +// can't slip in between. Call release(id) when the request finishes. +func (m *proxyManager) acquirePinned(id int) error { + m.mu.Lock() + defer m.mu.Unlock() + if !m.selectableLocked(id) { + return errDraining + } + m.inflight[id]++ + return nil +} + +// acquireAny picks a random selectable slot and reserves it. Returns errNoProxies +// if every slot is draining/warming. Call release(id) when the request finishes. +func (m *proxyManager) acquireAny() (int, error) { + m.mu.Lock() + defer m.mu.Unlock() + cands := make([]int, 0, m.n) + for i := 0; i < m.n; i++ { + if m.selectableLocked(i) { + cands = append(cands, i) + } + } + if len(cands) == 0 { + return 0, errNoProxies + } + id := cands[rand.IntN(len(cands))] + m.inflight[id]++ + return id, nil +} + +// release marks one in-flight request on id as finished. +func (m *proxyManager) release(id int) { + m.mu.Lock() + if m.inflight[id] > 0 { + m.inflight[id]-- + } + m.mu.Unlock() +} + +// drain stops new traffic to id and waits until in-flight reaches zero (bounded +// by timeout) followed by a settle period. Returns the number of requests still +// in flight when it gave up (0 on a clean drain). A self-heal timer is armed so +// the slot recovers even if undrain is never called. +func (m *proxyManager) drain(id int, timeout time.Duration) int { + m.mu.Lock() + m.draining[id] = true + // Infer rotation cadence: the gap since the previous drain is the interval. + now := time.Now() + if !m.lastRotated.IsZero() { + if d := now.Sub(m.lastRotated); d > 0 { + m.interval = d + } + } + m.lastSlot = id + m.lastRotated = now + if m.healTmr[id] != nil { + m.healTmr[id].Stop() + } + m.healTmr[id] = time.AfterFunc(m.maxHold, func() { m.undrain(id) }) + m.mu.Unlock() + + deadline := time.Now().Add(timeout) + for { + m.mu.Lock() + inf := m.inflight[id] + m.mu.Unlock() + if inf == 0 { + time.Sleep(m.settle) // let the last connection's FIN flush + return 0 + } + if time.Now().After(deadline) { + return inf + } + time.Sleep(50 * time.Millisecond) + } +} + +// undrain returns id to service after a warmup window during which it stays out +// of the candidate set (so the just-rebuilt tunnel's handshake can complete). +func (m *proxyManager) undrain(id int) { + m.mu.Lock() + if m.healTmr[id] != nil { + m.healTmr[id].Stop() + m.healTmr[id] = nil + } + m.draining[id] = false + m.warmupAt[id] = time.Now().Add(m.warmup) + hook := m.afterUndrain + m.mu.Unlock() + if hook != nil { + hook(id) + } +} diff --git a/service/manager_test.go b/service/manager_test.go new file mode 100644 index 0000000..15fab3a --- /dev/null +++ b/service/manager_test.go @@ -0,0 +1,176 @@ +package main + +import ( + "sync" + "testing" + "time" +) + +func newTestMgr(n int) *proxyManager { + // Zero settle/warmup so tests don't sleep; tiny maxHold disabled by being long. + return newProxyManager(n, 0, 0, time.Hour) +} + +func TestAcquireAnySkipsDraining(t *testing.T) { + m := newTestMgr(3) + m.drain(0, time.Second) // slot 0 out + m.drain(1, time.Second) // slot 1 out + + // Only slot 2 should ever come back from acquireAny. + for i := 0; i < 50; i++ { + id, err := m.acquireAny() + if err != nil { + t.Fatalf("acquireAny: %v", err) + } + if id != 2 { + t.Fatalf("expected only slot 2 selectable, got %d", id) + } + m.release(id) + } +} + +func TestAcquireAnyNoneAvailable(t *testing.T) { + m := newTestMgr(2) + m.drain(0, time.Second) + m.drain(1, time.Second) + if _, err := m.acquireAny(); err != errNoProxies { + t.Fatalf("expected errNoProxies, got %v", err) + } +} + +func TestAcquirePinnedDraining(t *testing.T) { + m := newTestMgr(2) + if err := m.acquirePinned(1); err != nil { + t.Fatalf("healthy slot should acquire: %v", err) + } + m.release(1) + + m.drain(1, time.Second) + if err := m.acquirePinned(1); err != errDraining { + t.Fatalf("expected errDraining for pinned drained slot, got %v", err) + } +} + +func TestDrainWaitsForInflight(t *testing.T) { + m := newTestMgr(1) + if _, err := m.acquireAny(); err != nil { // hold slot 0 in flight + t.Fatal(err) + } + + done := make(chan int, 1) + go func() { done <- m.drain(0, 2*time.Second) }() + + // drain must not return while the request is still in flight. + select { + case <-done: + t.Fatal("drain returned before in-flight request released") + case <-time.After(150 * time.Millisecond): + } + + m.release(0) + select { + case residual := <-done: + if residual != 0 { + t.Fatalf("expected clean drain (0 residual), got %d", residual) + } + case <-time.After(time.Second): + t.Fatal("drain did not return after release") + } +} + +func TestDrainTimeoutReportsResidual(t *testing.T) { + m := newTestMgr(1) + if _, err := m.acquireAny(); err != nil { // never released + t.Fatal(err) + } + if residual := m.drain(0, 100*time.Millisecond); residual != 1 { + t.Fatalf("expected residual 1 on timeout, got %d", residual) + } +} + +func TestUndrainRestoresSelection(t *testing.T) { + m := newTestMgr(1) + m.drain(0, time.Second) + if _, err := m.acquireAny(); err != errNoProxies { + t.Fatalf("expected slot unavailable while draining, got %v", err) + } + m.undrain(0) // warmup is zero in test mgr → immediately selectable + if _, err := m.acquireAny(); err != nil { + t.Fatalf("expected slot available after undrain, got %v", err) + } +} + +func TestWarmupHoldsSlotOut(t *testing.T) { + m := newProxyManager(1, 0, 200*time.Millisecond, time.Hour) + m.undrain(0) // sets warmupAt = now + 200ms + if _, err := m.acquireAny(); err != errNoProxies { + t.Fatalf("expected slot held out during warmup, got %v", err) + } + time.Sleep(250 * time.Millisecond) + if _, err := m.acquireAny(); err != nil { + t.Fatalf("expected slot available after warmup, got %v", err) + } +} + +func TestSelfHealOnMissedUndrain(t *testing.T) { + m := newProxyManager(1, 0, 0, 100*time.Millisecond) // maxHold 100ms + m.drain(0, time.Second) + if _, err := m.acquireAny(); err != errNoProxies { + t.Fatalf("expected unavailable right after drain, got %v", err) + } + time.Sleep(200 * time.Millisecond) // self-heal timer should fire + if _, err := m.acquireAny(); err != nil { + t.Fatalf("expected self-heal to restore slot, got %v", err) + } +} + +func TestSnapshotEgressAndRotation(t *testing.T) { + m := newProxyManager(3, 0, 0, time.Hour) + m.setEgress(2, "9.9.9.9", 12*time.Millisecond, true) + + m.drain(0, time.Second) // first rotation: establishes lastRotated only + m.undrain(0) + time.Sleep(5 * time.Millisecond) + m.drain(1, time.Second) // second rotation: gap becomes the interval; slot 1 left draining + + snap := m.snapshot() + + if got := snap.Proxies[2]; got.EgressIP != "9.9.9.9" || !got.Reachable || got.CheckedAgeSec == nil { + t.Fatalf("egress not reflected on slot 2: %+v", got) + } + if snap.Proxies[1].State != "draining" { + t.Fatalf("slot 1 should be draining, got %q", snap.Proxies[1].State) + } + if snap.Rotation.LastSlot == nil || *snap.Rotation.LastSlot != 1 { + t.Fatalf("last_slot want 1, got %v", snap.Rotation.LastSlot) + } + if snap.Rotation.NextSlot == nil || *snap.Rotation.NextSlot != 2 { + t.Fatalf("next_slot want 2 (round-robin), got %v", snap.Rotation.NextSlot) + } + if snap.Rotation.NextEgressIP != "9.9.9.9" { + t.Fatalf("next_egress_ip want 9.9.9.9 (slot 2's IP), got %q", snap.Rotation.NextEgressIP) + } + if snap.Rotation.ETASec == nil { + t.Fatal("expected an ETA once cadence is known") + } +} + +func TestConcurrentAcquireRelease(t *testing.T) { + m := newTestMgr(4) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + if id, err := m.acquireAny(); err == nil { + m.release(id) + } + }() + } + wg.Wait() + for i := 0; i < 4; i++ { + if m.inflight[i] != 0 { + t.Fatalf("slot %d leaked in-flight count: %d", i, m.inflight[i]) + } + } +} diff --git a/service/status.go b/service/status.go new file mode 100644 index 0000000..ba498b5 --- /dev/null +++ b/service/status.go @@ -0,0 +1,167 @@ +package main + +import ( + "context" + "io" + "net" + "net/http" + "sync" + "time" +) + +// ProxyStatus is one slot's entry in the /status snapshot. +type ProxyStatus struct { + ID int `json:"id"` + State string `json:"state"` // "live" | "draining" | "warming" + Inflight int `json:"inflight"` + // Egress fields are populated by the background prober (omitted until the + // first successful probe, or when probing is disabled). + EgressIP string `json:"egress_ip,omitempty"` + Reachable bool `json:"reachable"` + LatencyMS int64 `json:"latency_ms,omitempty"` + CheckedAgeSec *int64 `json:"checked_age_sec,omitempty"` +} + +// RotationStatus describes when the next rotation is due and which egress it +// will replace. All fields are inferred from observed drain timing, so they +// are nil until enough rotations have happened to establish the cadence. +type RotationStatus struct { + NextSlot *int `json:"next_slot,omitempty"` + NextEgressIP string `json:"next_egress_ip,omitempty"` + ETASec *int64 `json:"eta_sec,omitempty"` + IntervalSec *int64 `json:"interval_sec,omitempty"` + LastSlot *int `json:"last_slot,omitempty"` + LastRotatedAgeSec *int64 `json:"last_rotated_age_sec,omitempty"` +} + +// StatusSnapshot is the JSON returned by /status — cheap to produce, safe to +// poll every second. +type StatusSnapshot struct { + Now string `json:"now"` + ProxiesTotal int `json:"proxies_total"` + Available int `json:"available"` + Proxies []ProxyStatus `json:"proxies"` + Rotation RotationStatus `json:"rotation"` +} + +// snapshot builds the current status under the lock. No network I/O happens +// here — egress data is whatever the prober last cached. +func (m *proxyManager) snapshot() StatusSnapshot { + m.mu.Lock() + defer m.mu.Unlock() + now := time.Now() + + snap := StatusSnapshot{ + Now: now.UTC().Format(time.RFC3339), + ProxiesTotal: m.n, + Proxies: make([]ProxyStatus, m.n), + } + for i := 0; i < m.n; i++ { + state := "live" + switch { + case m.draining[i]: + state = "draining" + case !m.warmupAt[i].IsZero() && now.Before(m.warmupAt[i]): + state = "warming" + default: + snap.Available++ + } + ps := ProxyStatus{ID: i, State: state, Inflight: m.inflight[i]} + if e := m.egress[i]; !e.checkedAt.IsZero() { + ps.EgressIP = e.ip + ps.Reachable = e.ok + ps.LatencyMS = e.latency.Milliseconds() + age := int64(now.Sub(e.checkedAt).Seconds()) + ps.CheckedAgeSec = &age + } + snap.Proxies[i] = ps + } + + if !m.lastRotated.IsZero() { + last := m.lastSlot + snap.Rotation.LastSlot = &last + lra := int64(now.Sub(m.lastRotated).Seconds()) + snap.Rotation.LastRotatedAgeSec = &lra + if m.interval > 0 { + next := (m.lastSlot + 1) % m.n + isec := int64(m.interval.Seconds()) + eta := int64(m.lastRotated.Add(m.interval).Sub(now).Seconds()) + if eta < 0 { + eta = 0 + } + snap.Rotation.NextSlot = &next + snap.Rotation.NextEgressIP = m.egress[next].ip + snap.Rotation.IntervalSec = &isec + snap.Rotation.ETASec = &eta + } + } + return snap +} + +// prober periodically fetches each live slot's public egress IP through its own +// SOCKS transport and caches it on the manager for /status to report. +type prober struct { + mgr *proxyManager + pool []*http.Transport + url string + timeout time.Duration +} + +// probe fetches the egress IP for a single slot. A non-2xx response or a body +// that isn't a valid IP is recorded as unreachable. +func (p *prober) probe(id int) { + client := &http.Client{Transport: p.pool[id], Timeout: p.timeout} + req, err := http.NewRequest(http.MethodGet, p.url, nil) + if err != nil { + p.mgr.setEgress(id, "", 0, false) + return + } + start := time.Now() + resp, err := client.Do(req) + if err != nil { + p.mgr.setEgress(id, "", 0, false) + return + } + defer resp.Body.Close() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 64)) + ip := string(body) + for len(ip) > 0 && (ip[len(ip)-1] == '\n' || ip[len(ip)-1] == '\r' || ip[len(ip)-1] == ' ') { + ip = ip[:len(ip)-1] + } + ok := resp.StatusCode == http.StatusOK && net.ParseIP(ip) != nil + if !ok { + ip = "" + } + p.mgr.setEgress(id, ip, time.Since(start), ok) +} + +// probeAll probes every currently selectable slot concurrently (draining and +// warming slots are skipped — their egress is mid-change). +func (p *prober) probeAll() { + var wg sync.WaitGroup + for i := 0; i < p.mgr.n; i++ { + p.mgr.mu.Lock() + sel := p.mgr.selectableLocked(i) + p.mgr.mu.Unlock() + if !sel { + continue + } + wg.Add(1) + go func(id int) { defer wg.Done(); p.probe(id) }(i) + } + wg.Wait() +} + +func (p *prober) run(ctx context.Context, interval time.Duration) { + p.probeAll() + t := time.NewTicker(interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + p.probeAll() + } + } +}