rotate egress over a larger WG pool, with graceful drain and live status
Keep the N always-on tunnel slots fixed but let each slot's WireGuard config
rotate through a larger pool, so a 10-concurrent provider cap (e.g. Proton) can
still cycle 50-100 profiles.
- lxc/rotate.sh + weircon-rotate.{service,timer}: round-robin one slot at a
time through wg-pool/, repointing a symlink and restarting only that slot.
- service: proxyManager tracks per-slot in-flight + drain/undrain state; a
localhost admin server (WEIRCON_ADMIN_LISTEN) lets rotate.sh drain a slot
before teardown and warm it back in after, so no request is routed to a
tunnel mid-rotation. Slots self-heal if undrain never arrives.
- GET /status: poll-friendly JSON of per-slot egress IP/state plus inferred
next-rotation slot + ETA, fed by a background egress-IP prober.
- docs + env examples for all new knobs.
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
wg-configs/*.conf
|
||||
*.env
|
||||
service/weircon-random-proxy
|
||||
service/random-proxy
|
||||
|
||||
@@ -56,6 +56,39 @@ Use it whenever you need an HTTP-facing API that returns results from rotating e
|
||||
|
||||
Tunnel count is configurable. Default deploy uses 10 tunnels; the WireGuard configs themselves can come from any provider that gives you a standard `[Interface] / [Peer]` config (ProtonVPN, Mullvad, AzireVPN, self-hosted, …). Drop the configs into `/etc/weircon-random-proxy/wg/proxy<N>.conf` and the rest is identical.
|
||||
|
||||
### Rotating over more configs than tunnels
|
||||
|
||||
A slot (netns `proxy<N>`, fixed IP, fixed SOCKS port) is decoupled from the WireGuard config it carries. `netns-up.sh` just reads `wg/proxy<N>.conf`, and that path can be a **symlink** into a larger pool. So if your provider caps concurrent connections (ProtonVPN allows 10) but you hold 50–100 configs, keep the 10 always-on slots and rotate *which config each slot points at*:
|
||||
|
||||
```
|
||||
/etc/weircon-random-proxy/
|
||||
wg-pool/ ← all your configs (proton-001.conf … proton-100.conf)
|
||||
wg/
|
||||
proxy0.conf -> ../wg-pool/proton-047.conf ← symlink, repointed over time
|
||||
…
|
||||
proxy9.conf -> ../wg-pool/proton-088.conf
|
||||
```
|
||||
|
||||
`weircon-rotate.timer` fires `rotate.sh`, which advances **one slot at a time** to the next unused config in the pool and restarts only that slot. The other 9 stay up, so there is always egress; over a full cycle every config gets used. Because `weircon-proxy@.service` tears the old tunnel down before raising the new one, a slot is never doubly-connected — you never exceed the provider's concurrent limit, even mid-rotation.
|
||||
|
||||
**Graceful drain.** So a request is never handed to a tunnel that's about to disappear, `rotate.sh` coordinates with the service over a localhost-only admin API before touching a slot:
|
||||
|
||||
1. `POST /drain?id=N` — the service stops routing **new** requests to slot N and blocks until that slot's in-flight requests finish (bounded by `WEIRCON_DRAIN_TIMEOUT_SEC`), plus a short `WEIRCON_DRAIN_SETTLE_SEC` quiet period.
|
||||
2. The symlink is repointed and the tunnel unit restarted.
|
||||
3. `POST /undrain?id=N` — slot N is returned to service, but held out of the candidate set for `WEIRCON_DRAIN_WARMUP_SEC` so the fresh WireGuard handshake can settle before traffic resumes.
|
||||
|
||||
Random picks skip draining/warming slots automatically; a request that *pins* a slot mid-rotation gets `503` + `Retry-After`. If `rotate.sh` ever dies between drain and undrain, the slot self-heals after `WEIRCON_DRAIN_MAX_HOLD_SEC` so the pool can't shrink permanently. The admin server binds `127.0.0.1:8081` by default — keep it off your reverse proxy. `GET /status` returns a per-slot snapshot (in-flight / draining / warming).
|
||||
|
||||
```sh
|
||||
cp proton-*.conf /etc/weircon-random-proxy/wg-pool/ # the pool (not in git)
|
||||
weircon-rotate init # seed 10 distinct symlinks
|
||||
systemctl restart weircon-proxy@{0..9}.service
|
||||
systemctl enable --now weircon-rotate.timer # auto-rotate (default every 5 min)
|
||||
systemctl edit weircon-rotate.timer # change interval (OnUnitActiveSec)
|
||||
```
|
||||
|
||||
The fetch service is unaware of any of this — it still sees 10 fixed SOCKS endpoints. The only observable change is that the egress IP behind a given `X-WEIRCON-PROXY-ID` drifts over time. (Trade-off: restarting a slot drops requests in flight on that one slot for the ~1–2 s WireGuard handshake; one-at-a-time rotation keeps the blast radius to a single slot.)
|
||||
|
||||
## Client API
|
||||
|
||||
Fetch a URL through a random tunnel:
|
||||
@@ -97,6 +130,37 @@ curl -H "X-WEIRCON-RANDOM-IP: $API_KEY" \
|
||||
|
||||
Status, headers, and body of the upstream response are returned directly. The service adds `X-WEIRCON-EGRESS-PROXY: <id>` so the caller can see which tunnel was used.
|
||||
|
||||
### Live status (`/status`)
|
||||
|
||||
`GET /status` returns a poll-friendly JSON snapshot for scraper-side dashboards — cheap to produce (no network I/O in the request path; egress IPs come from a background prober) and safe to hit every 1–5 s. It's behind the same API-key gate as `/`.
|
||||
|
||||
```bash
|
||||
curl -H "X-WEIRCON-RANDOM-IP: $API_KEY" https://random-proxy.example.com/status
|
||||
```
|
||||
|
||||
```jsonc
|
||||
{
|
||||
"now": "2026-06-01T08:47:18Z",
|
||||
"proxies_total": 10,
|
||||
"available": 9, // slots currently selectable (not draining/warming)
|
||||
"proxies": [
|
||||
{ "id": 0, "state": "live", "inflight": 2, "egress_ip": "185.x.x.10", "reachable": true, "latency_ms": 42, "checked_age_sec": 7 },
|
||||
{ "id": 1, "state": "draining", "inflight": 0, "egress_ip": "185.x.x.11", "reachable": true, "latency_ms": 51, "checked_age_sec": 9 },
|
||||
// …
|
||||
],
|
||||
"rotation": {
|
||||
"next_slot": 2, // which slot rotates next (round-robin)
|
||||
"next_egress_ip": "185.x.x.12", // the egress IP about to be replaced
|
||||
"eta_sec": 137, // seconds until the next rotation
|
||||
"interval_sec": 300,
|
||||
"last_slot": 1, // slot that rotated most recently
|
||||
"last_rotated_age_sec": 163
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Per-slot `state` is `live` (selectable), `draining` (in-flight finishing, about to be torn down), or `warming` (new tunnel up, settling before traffic resumes) — drive your visual "rotating now" indicator off the latter two. The `rotation` block tells you *when* the next rotation lands and *which* egress IP it will replace, so you can show a countdown and flag the outgoing IP ahead of time. The `rotation` fields stay absent until enough rotations have happened to infer the cadence (≈ two timer ticks after boot); egress fields are absent when probing is disabled.
|
||||
|
||||
## Built-in API tester (`/ui`)
|
||||
|
||||
The fetch service ships with an embedded HTML page at `/ui` that documents the API and lets you exercise every endpoint interactively:
|
||||
@@ -119,10 +183,14 @@ The UI is enabled by default. Set `WEIRCON_UI_ENABLED=false` in `fetch.env` to d
|
||||
| `lxc/setup-container.sh` | Runs **inside** the LXC: apt deps, microsocks, helper scripts, systemd units. |
|
||||
| `lxc/netns-up.sh` | Brings one tunnel namespace up (`proxy<N>` with wg0 + veth + bridge). |
|
||||
| `lxc/netns-down.sh` | Tears it back down (invoked by systemd ExecStopPost). |
|
||||
| `lxc/rotate.sh` | Rotates one slot to the next pool config; `init` seeds the slots. |
|
||||
| `lxc/systemd/weircon-proxies.target`| Grouping target for all tunnels. |
|
||||
| `lxc/systemd/weircon-proxy@.service`| Templated unit. Enable with `weircon-proxy@{0..N-1}.service`. |
|
||||
| `lxc/systemd/weircon-rotate.service`| Oneshot that runs `rotate.sh` (one slot per invocation). |
|
||||
| `lxc/systemd/weircon-rotate.timer` | Fires the rotation on an interval (default 5 min; `systemctl edit` to change).|
|
||||
| `lxc/systemd/weircon-fetch.service` | The fetch service itself. Hardened (DynamicUser, no capabilities). |
|
||||
| `lxc/fetch.env.example` | Default env. Copy to `/etc/weircon-random-proxy/fetch.env`. |
|
||||
| `lxc/rotate.env.example` | Rotation env (`SLOTS`). Copy to `/etc/weircon-random-proxy/rotate.env`. |
|
||||
| `npm/advanced.conf` | Reverse-proxy snippet: header check, strip, forward. |
|
||||
| `wg-configs/` | Local placeholder. Actual `.conf` files live on the LXC, **not** in git. |
|
||||
|
||||
@@ -137,6 +205,14 @@ The UI is enabled by default. Set `WEIRCON_UI_ENABLED=false` in `fetch.env` to d
|
||||
| `WEIRCON_PROXY_COUNT` | `10` | Number of fallback endpoints. |
|
||||
| `WEIRCON_REQUEST_TIMEOUT_SEC` | `30` | Per-upstream-fetch ceiling. |
|
||||
| `WEIRCON_UI_ENABLED` | `true` | Toggle the embedded `/ui` page. |
|
||||
| `WEIRCON_ADMIN_LISTEN` | `127.0.0.1:8081` | Localhost drain/undrain API for rotation. Empty disables it. |
|
||||
| `WEIRCON_DRAIN_TIMEOUT_SEC` | `25` | Max wait for a slot's in-flight requests to clear during drain. |
|
||||
| `WEIRCON_DRAIN_SETTLE_SEC` | `2` | Quiet period after in-flight hits zero (the "couple secs before").|
|
||||
| `WEIRCON_DRAIN_WARMUP_SEC` | `3` | Slot held out of rotation after undrain (the "couple secs after").|
|
||||
| `WEIRCON_DRAIN_MAX_HOLD_SEC` | `60` | Auto-undrain safety if `rotate.sh` dies between drain and undrain.|
|
||||
| `WEIRCON_PROBE_URL` | `https://api.ipify.org` | IP-echo URL the prober fetches through each tunnel for `/status`. |
|
||||
| `WEIRCON_PROBE_INTERVAL_SEC` | `60` | Egress-IP probe cadence. `0` disables probing (`egress_ip` empty).|
|
||||
| `WEIRCON_PROBE_TIMEOUT_SEC` | `10` | Per-probe ceiling. |
|
||||
|
||||
If `WEIRCON_PROXY_ADDRS` is set it wins; otherwise the service computes `WEIRCON_PROXY_HOST:WEIRCON_PROXY_BASE_PORT + i` for `i` in `[0, WEIRCON_PROXY_COUNT)`.
|
||||
|
||||
|
||||
@@ -7,3 +7,28 @@ WEIRCON_PROXY_ADDRS=10.99.0.10:1080,10.99.0.11:1080,10.99.0.12:1080,10.99.0.13:1
|
||||
|
||||
# Maks tid pr. upstream-fetch (sekunder).
|
||||
WEIRCON_REQUEST_TIMEOUT_SEC=30
|
||||
|
||||
# Admin-endpoint til rotation (drain/undrain). Kun localhost — må ALDRIG
|
||||
# eksponeres via reverse-proxy. Tom streng slår admin-serveren helt fra.
|
||||
WEIRCON_ADMIN_LISTEN=127.0.0.1:8081
|
||||
|
||||
# Rotation-drænings-tuning (sekunder):
|
||||
# TIMEOUT = maks ventetid på at in-flight requests på en slot tømmes
|
||||
# SETTLE = ekstra ro-periode efter in-flight = 0 ("et par sek. før")
|
||||
# WARMUP = slot holdes ude af rotation efter undrain, så WG-handshaket
|
||||
# kan sætte sig ("et par sek. efter")
|
||||
# MAX_HOLD= sikkerhed: slot self-healer og kommer tilbage selv hvis
|
||||
# rotate.sh dør mellem drain og undrain
|
||||
WEIRCON_DRAIN_TIMEOUT_SEC=25
|
||||
WEIRCON_DRAIN_SETTLE_SEC=2
|
||||
WEIRCON_DRAIN_WARMUP_SEC=3
|
||||
WEIRCON_DRAIN_MAX_HOLD_SEC=60
|
||||
|
||||
# Egress-IP prober: henter den aktuelle udgangs-IP gennem hver tunnel og cacher
|
||||
# den, så /status kan vise hvilken IP der sidder på hver slot (og hvilken der
|
||||
# snart roteres ud). INTERVAL=0 slår probing helt fra (så er egress_ip tom i
|
||||
# /status, og klienten må selv probe). URL skal returnere klientens IP som ren
|
||||
# tekst (ipify, ifconfig.me/ip, egen endpoint, …).
|
||||
WEIRCON_PROBE_URL=https://api.ipify.org
|
||||
WEIRCON_PROBE_INTERVAL_SEC=60
|
||||
WEIRCON_PROBE_TIMEOUT_SEC=10
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
# Kopiér til /etc/weircon-random-proxy/rotate.env (valgfrit).
|
||||
# Læses af weircon-rotate.service.
|
||||
|
||||
# Antal aktive tunnel-slots. SKAL matche det weircon-proxy@N-interval du har
|
||||
# enablet (fx weircon-proxy@{0..9} => SLOTS=10). Default hvis usat: 10.
|
||||
SLOTS=10
|
||||
Executable
+133
@@ -0,0 +1,133 @@
|
||||
#!/bin/bash
|
||||
# Roterer ÉN tunnel-slot ad gangen til den næste WG-config i puljen.
|
||||
#
|
||||
# - Slots (netns proxy0..proxy<SLOTS-1>) er faste: faste IP'er, faste
|
||||
# SOCKS-porte. Fetch-servicen kender kun de faste endpoints.
|
||||
# - Configs ligger i en pulje (wg-pool/) der kan være større end SLOTS.
|
||||
# - Hver slot's aktive config er et symlink:
|
||||
# wg/proxy<ID>.conf -> wg-pool/<navn>.conf
|
||||
# - Hvert kald her flytter præcis én slot videre til næste ledige config
|
||||
# i puljen og genstarter kun den slot. De øvrige 9 bliver oppe, så der
|
||||
# altid er egress ud — og over tid cykler alle configs igennem.
|
||||
#
|
||||
# Hvorfor én ad gangen: holder os trygt inden for udbyderens samtidigheds-
|
||||
# grænse (fx Proton: max 10 samtidige WG-sessioner) og dropper aldrig al
|
||||
# egress på én gang. weircon-proxy@.service river desuden den gamle tunnel
|
||||
# ned (ExecStopPost) FØR den nye rejses (ExecStartPre), så en slot er aldrig
|
||||
# dobbelt-forbundet midt i en rotation.
|
||||
#
|
||||
# Brug:
|
||||
# rotate.sh # roter én slot (cursor i state-filen rykker)
|
||||
# rotate.sh init # seed alle slots med de første SLOTS distinkte configs
|
||||
set -euo pipefail
|
||||
|
||||
POOL="/etc/weircon-random-proxy/wg-pool"
|
||||
ACTIVE="/etc/weircon-random-proxy/wg"
|
||||
STATE="/var/lib/weircon-random-proxy/rotate.state"
|
||||
SLOTS="${SLOTS:-10}"
|
||||
# Fetch-servicens admin-endpoint (lokal, ikke eksponeret via reverse-proxy).
|
||||
# Tom => spring drain/undrain over (falder tilbage til ren restart).
|
||||
ADMIN_URL="${WEIRCON_ADMIN_URL:-http://127.0.0.1:8081}"
|
||||
|
||||
# Pulje-configs, sorteret og stabilt ordnet (kun basenavne).
|
||||
mapfile -t POOL_CONFIGS < <(find "$POOL" -maxdepth 1 -type f -name '*.conf' -printf '%f\n' 2>/dev/null | sort)
|
||||
N="${#POOL_CONFIGS[@]}"
|
||||
if (( N == 0 )); then
|
||||
echo "ingen configs i $POOL — læg proton-*.conf filer der" >&2
|
||||
exit 1
|
||||
fi
|
||||
if (( N < SLOTS )); then
|
||||
echo "WARN: kun $N configs i puljen til $SLOTS slots — der vil være gengangere" >&2
|
||||
fi
|
||||
|
||||
# Hvilke configs er allerede i brug af en ANDEN slot end $skip_slot?
|
||||
configs_in_use() {
|
||||
local skip="$1" i link
|
||||
for (( i = 0; i < SLOTS; i++ )); do
|
||||
(( i == skip )) && continue
|
||||
link="$ACTIVE/proxy${i}.conf"
|
||||
[[ -L "$link" ]] && basename "$(readlink -f "$link")"
|
||||
done
|
||||
}
|
||||
|
||||
# Find næste config i puljen fra position $1 som ikke er i $2..(resten).
|
||||
# Sætter globalt $CHOSEN og $NEXT_POS.
|
||||
pick_free_config() {
|
||||
local start="$1" skip_slot="$2" k cand
|
||||
local -a busy
|
||||
mapfile -t busy < <(configs_in_use "$skip_slot")
|
||||
for (( k = 0; k < N; k++ )); do
|
||||
cand="${POOL_CONFIGS[(start + k) % N]}"
|
||||
if ! printf '%s\n' "${busy[@]}" | grep -qxF "$cand"; then
|
||||
CHOSEN="$cand"
|
||||
NEXT_POS=$(( (start + k + 1) % N ))
|
||||
return 0
|
||||
fi
|
||||
done
|
||||
return 1
|
||||
}
|
||||
|
||||
# Bed fetch-servicen om at stoppe ny trafik til en slot og vente til
|
||||
# in-flight er drænet (servicen håndterer selv ventetid + settle). Stille
|
||||
# no-op hvis admin-endpointet ikke svarer (fx UI/service slået fra).
|
||||
drain_slot() {
|
||||
local slot="$1"
|
||||
[[ -z "$ADMIN_URL" ]] && return 0
|
||||
curl -fsS -m 60 -X POST "${ADMIN_URL}/drain?id=${slot}" >/dev/null 2>&1 || \
|
||||
echo "WARN: drain af slot $slot fejlede (fortsætter)" >&2
|
||||
}
|
||||
|
||||
# Giv slot tilbage til servicen. Servicen holder den selv ude af rotation et
|
||||
# par sekunder mere (warmup) så den friske WG-handshake kan nå at sætte sig.
|
||||
undrain_slot() {
|
||||
local slot="$1"
|
||||
[[ -z "$ADMIN_URL" ]] && return 0
|
||||
curl -fsS -m 10 -X POST "${ADMIN_URL}/undrain?id=${slot}" >/dev/null 2>&1 || \
|
||||
echo "WARN: undrain af slot $slot fejlede (slot self-healer)" >&2
|
||||
}
|
||||
|
||||
assign_slot() {
|
||||
local slot="$1" cfg="$2"
|
||||
drain_slot "$slot" # stop trafik FØR teardown
|
||||
ln -sfn "$POOL/$cfg" "$ACTIVE/proxy${slot}.conf"
|
||||
systemctl restart "weircon-proxy@${slot}.service" # river gammel ned, rejser ny
|
||||
undrain_slot "$slot" # genoptag trafik EFTER (m. warmup)
|
||||
}
|
||||
|
||||
mkdir -p "$(dirname "$STATE")"
|
||||
|
||||
# --- init: seed alle slots distinkt -------------------------------------
|
||||
if [[ "${1:-}" == "init" ]]; then
|
||||
pos=0
|
||||
for (( s = 0; s < SLOTS; s++ )); do
|
||||
if ! pick_free_config "$pos" "$s"; then
|
||||
echo "kunne ikke finde ledig config til slot $s" >&2
|
||||
exit 1
|
||||
fi
|
||||
ln -sfn "$POOL/$CHOSEN" "$ACTIVE/proxy${s}.conf"
|
||||
pos="$NEXT_POS"
|
||||
echo "slot $s -> $CHOSEN"
|
||||
done
|
||||
printf 'slot=0\npos=%s\n' "$pos" > "$STATE"
|
||||
echo "seedet $SLOTS slots; start/genstart tunnelerne for at aktivere"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# --- normal: roter præcis én slot ---------------------------------------
|
||||
slot=0
|
||||
pos=0
|
||||
# shellcheck disable=SC1090
|
||||
[[ -f "$STATE" ]] && source "$STATE"
|
||||
(( slot >= SLOTS )) && slot=0
|
||||
|
||||
if ! pick_free_config "$pos" "$slot"; then
|
||||
echo "kunne ikke finde ledig config til slot $slot" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
assign_slot "$slot" "$CHOSEN"
|
||||
echo "roterede slot $slot -> $CHOSEN ($((slot + 1))/$SLOTS denne runde)"
|
||||
|
||||
# Ryk cursors: næste slot, og pulje-position er allerede sat af pick_free_config.
|
||||
slot=$(( (slot + 1) % SLOTS ))
|
||||
printf 'slot=%s\npos=%s\n' "$slot" "$NEXT_POS" > "$STATE"
|
||||
+20
-2
@@ -54,18 +54,26 @@ chmod +x "$BIN"
|
||||
# 4. Helper-scripts
|
||||
install -Dm755 "$SCRIPT_DIR/netns-up.sh" /usr/local/sbin/weircon-netns-up
|
||||
install -Dm755 "$SCRIPT_DIR/netns-down.sh" /usr/local/sbin/weircon-netns-down
|
||||
install -Dm755 "$SCRIPT_DIR/rotate.sh" /usr/local/sbin/weircon-rotate
|
||||
|
||||
# 5. systemd units
|
||||
install -Dm644 "$SCRIPT_DIR/weircon-proxies.target" /etc/systemd/system/weircon-proxies.target
|
||||
install -Dm644 "$SCRIPT_DIR/weircon-proxy@.service" /etc/systemd/system/weircon-proxy@.service
|
||||
install -Dm644 "$SCRIPT_DIR/weircon-fetch.service" /etc/systemd/system/weircon-fetch.service
|
||||
install -Dm644 "$SCRIPT_DIR/weircon-rotate.service" /etc/systemd/system/weircon-rotate.service
|
||||
install -Dm644 "$SCRIPT_DIR/weircon-rotate.timer" /etc/systemd/system/weircon-rotate.timer
|
||||
|
||||
# 6. Config-dir + default env
|
||||
mkdir -p /etc/weircon-random-proxy/wg
|
||||
chmod 700 /etc/weircon-random-proxy/wg
|
||||
# wg/ = aktive slot-configs (proxy0.conf..proxyN.conf, evt. symlinks)
|
||||
# wg-pool/ = valgfri pulje af ekstra configs som rotationen cykler igennem
|
||||
mkdir -p /etc/weircon-random-proxy/wg /etc/weircon-random-proxy/wg-pool
|
||||
chmod 700 /etc/weircon-random-proxy/wg /etc/weircon-random-proxy/wg-pool
|
||||
if [[ ! -f /etc/weircon-random-proxy/fetch.env ]]; then
|
||||
install -m640 "$SCRIPT_DIR/fetch.env.example" /etc/weircon-random-proxy/fetch.env
|
||||
fi
|
||||
if [[ ! -f /etc/weircon-random-proxy/rotate.env ]]; then
|
||||
install -m640 "$SCRIPT_DIR/rotate.env.example" /etc/weircon-random-proxy/rotate.env
|
||||
fi
|
||||
|
||||
systemctl daemon-reload
|
||||
|
||||
@@ -89,6 +97,16 @@ Start stakken:
|
||||
systemctl enable --now weircon-proxy@{0..9}.service
|
||||
systemctl enable --now weircon-fetch.service
|
||||
|
||||
(Valgfrit) Roter over en STØRRE pulje af configs end de 10 slots:
|
||||
# læg fx 50-100 configs i puljen
|
||||
cp proton-*.conf /etc/weircon-random-proxy/wg-pool/
|
||||
# seed de 10 aktive slots som symlinks ind i puljen
|
||||
weircon-rotate init
|
||||
systemctl restart weircon-proxy@{0..9}.service
|
||||
# roter automatisk én slot ad gangen (interval i weircon-rotate.timer)
|
||||
systemctl enable --now weircon-rotate.timer
|
||||
# juster intervallet: systemctl edit weircon-rotate.timer (default 5min)
|
||||
|
||||
Verificér:
|
||||
systemctl status 'weircon-proxy@*'
|
||||
curl http://127.0.0.1:8080/health
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
[Unit]
|
||||
Description=weircon-random-proxy: roter én tunnel-slot til næste WG-config i puljen
|
||||
After=weircon-proxies.target
|
||||
Wants=weircon-proxies.target
|
||||
|
||||
[Service]
|
||||
Type=oneshot
|
||||
EnvironmentFile=-/etc/weircon-random-proxy/rotate.env
|
||||
ExecStart=/usr/local/sbin/weircon-rotate
|
||||
@@ -0,0 +1,21 @@
|
||||
[Unit]
|
||||
Description=weircon-random-proxy: periodisk slot-rotation
|
||||
PartOf=weircon-proxies.target
|
||||
|
||||
[Timer]
|
||||
# Standard: roter én slot hvert 5. minut. Med 10 slots betyder det at hele
|
||||
# slot-sættet er cyklet igennem ca. hvert 50. minut, og hver enkelt config i
|
||||
# en pulje på N filer er brugt mindst én gang efter ca. (N * 5) minutter.
|
||||
#
|
||||
# Skift interval uden at redigere denne fil:
|
||||
# systemctl edit weircon-rotate.timer
|
||||
# og indsæt fx:
|
||||
# [Timer]
|
||||
# OnUnitActiveSec= # (tom linje nulstiller arvet værdi)
|
||||
# OnUnitActiveSec=15min
|
||||
OnBootSec=5min
|
||||
OnUnitActiveSec=5min
|
||||
AccuracySec=15s
|
||||
|
||||
[Install]
|
||||
WantedBy=timers.target
|
||||
@@ -0,0 +1,67 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// newAdminHandler exposes the rotation control surface. It is intended to bind
|
||||
// to localhost only (WEIRCON_ADMIN_LISTEN) and is called by rotate.sh:
|
||||
//
|
||||
// POST /drain?id=N take slot N out of service; blocks until in-flight
|
||||
// drains (bounded by WEIRCON_DRAIN_TIMEOUT_SEC) + settle.
|
||||
// 200 {"id":N,"drained":true,"inflight":0} on a clean drain,
|
||||
// "drained":false with the residual count on timeout.
|
||||
// POST /undrain?id=N return slot N to service after the warmup window.
|
||||
// GET /status snapshot of every slot.
|
||||
func newAdminHandler(cfg Config, mgr *proxyManager) http.Handler {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
slotID := func(w http.ResponseWriter, r *http.Request) (int, bool) {
|
||||
raw := r.URL.Query().Get("id")
|
||||
id, err := strconv.Atoi(raw)
|
||||
if err != nil || id < 0 || id >= mgr.n {
|
||||
http.Error(w, fmt.Sprintf("id must be 0-%d", mgr.n-1), http.StatusBadRequest)
|
||||
return 0, false
|
||||
}
|
||||
return id, true
|
||||
}
|
||||
writeJSON := func(w http.ResponseWriter, v any) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(v)
|
||||
}
|
||||
|
||||
mux.HandleFunc("/drain", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
id, ok := slotID(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
residual := mgr.drain(id, cfg.DrainTimeout)
|
||||
writeJSON(w, map[string]any{"id": id, "drained": residual == 0, "inflight": residual})
|
||||
})
|
||||
|
||||
mux.HandleFunc("/undrain", func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPost {
|
||||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
id, ok := slotID(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
mgr.undrain(id)
|
||||
writeJSON(w, map[string]any{"id": id, "warmup_sec": int(cfg.DrainWarmup.Seconds())})
|
||||
})
|
||||
|
||||
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, mgr.snapshot())
|
||||
})
|
||||
|
||||
return mux
|
||||
}
|
||||
+98
-14
@@ -7,12 +7,12 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
_ "embed"
|
||||
"context"
|
||||
_ "embed"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
@@ -29,17 +29,37 @@ import (
|
||||
var uiHTML []byte
|
||||
|
||||
type Config struct {
|
||||
Listen string
|
||||
ProxyAddrs []string
|
||||
Timeout time.Duration
|
||||
UIEnabled bool
|
||||
Listen string
|
||||
AdminListen string
|
||||
ProxyAddrs []string
|
||||
Timeout time.Duration
|
||||
UIEnabled bool
|
||||
|
||||
// Rotation drain tuning.
|
||||
DrainTimeout time.Duration // max wait for in-flight to clear during drain
|
||||
DrainSettle time.Duration // quiet period after in-flight hits zero (the "before")
|
||||
DrainWarmup time.Duration // post-undrain cool-in before re-selection (the "after")
|
||||
DrainMaxHold time.Duration // safety: auto-undrain a slot if undrain never arrives
|
||||
|
||||
// Egress-IP prober (feeds /status).
|
||||
ProbeURL string // IP-echo endpoint fetched through each tunnel
|
||||
ProbeInterval time.Duration // 0 disables probing
|
||||
ProbeTimeout time.Duration // per-probe ceiling
|
||||
}
|
||||
|
||||
func loadConfig() (Config, error) {
|
||||
cfg := Config{
|
||||
Listen: envStr("WEIRCON_LISTEN", ":8080"),
|
||||
Timeout: time.Duration(envInt("WEIRCON_REQUEST_TIMEOUT_SEC", 30)) * time.Second,
|
||||
UIEnabled: envBool("WEIRCON_UI_ENABLED", true),
|
||||
Listen: envStr("WEIRCON_LISTEN", ":8080"),
|
||||
AdminListen: envStr("WEIRCON_ADMIN_LISTEN", "127.0.0.1:8081"),
|
||||
Timeout: time.Duration(envInt("WEIRCON_REQUEST_TIMEOUT_SEC", 30)) * time.Second,
|
||||
UIEnabled: envBool("WEIRCON_UI_ENABLED", true),
|
||||
DrainTimeout: time.Duration(envInt("WEIRCON_DRAIN_TIMEOUT_SEC", 25)) * time.Second,
|
||||
DrainSettle: time.Duration(envInt("WEIRCON_DRAIN_SETTLE_SEC", 2)) * time.Second,
|
||||
DrainWarmup: time.Duration(envInt("WEIRCON_DRAIN_WARMUP_SEC", 3)) * time.Second,
|
||||
DrainMaxHold: time.Duration(envInt("WEIRCON_DRAIN_MAX_HOLD_SEC", 60)) * time.Second,
|
||||
ProbeURL: envStr("WEIRCON_PROBE_URL", "https://api.ipify.org"),
|
||||
ProbeInterval: time.Duration(envInt("WEIRCON_PROBE_INTERVAL_SEC", 60)) * time.Second,
|
||||
ProbeTimeout: time.Duration(envInt("WEIRCON_PROBE_TIMEOUT_SEC", 10)) * time.Second,
|
||||
}
|
||||
if raw := envStr("WEIRCON_PROXY_ADDRS", ""); raw != "" {
|
||||
for _, a := range strings.Split(raw, ",") {
|
||||
@@ -192,10 +212,13 @@ func stripIdentifying(h http.Header) {
|
||||
}
|
||||
}
|
||||
|
||||
func pickProxyID(req *http.Request, n int) (int, error) {
|
||||
// parsePin reads the optional X-WEIRCON-PROXY-ID header. It returns (-1, nil)
|
||||
// when no pin is requested (caller should pick at random). The actual slot
|
||||
// reservation — and the draining check — happens in the manager, not here.
|
||||
func parsePin(req *http.Request, n int) (int, error) {
|
||||
raw := req.Header.Get("X-Weircon-Proxy-Id")
|
||||
if raw == "" {
|
||||
return rand.IntN(n), nil
|
||||
return -1, nil
|
||||
}
|
||||
id, err := strconv.Atoi(raw)
|
||||
if err != nil {
|
||||
@@ -235,7 +258,7 @@ func resolveMethod(req *http.Request) string {
|
||||
return req.Method
|
||||
}
|
||||
|
||||
func newHandler(cfg Config, pool []*http.Transport) http.Handler {
|
||||
func newHandler(cfg Config, pool []*http.Transport, mgr *proxyManager) http.Handler {
|
||||
rp := &httputil.ReverseProxy{
|
||||
Rewrite: func(pr *httputil.ProxyRequest) {
|
||||
rc, ok := pr.In.Context().Value(ctxKey{}).(reqCtx)
|
||||
@@ -271,6 +294,13 @@ func newHandler(cfg Config, pool []*http.Transport) http.Handler {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Fprintf(w, `{"ok":true,"proxies":%d}`, count)
|
||||
})
|
||||
// Poll-friendly status for scraper-side dashboards: per-slot egress IP +
|
||||
// state, plus when the next rotation is due and which egress it replaces.
|
||||
mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Cache-Control", "no-store")
|
||||
_ = json.NewEncoder(w).Encode(mgr.snapshot())
|
||||
})
|
||||
if cfg.UIEnabled {
|
||||
mux.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||||
@@ -296,11 +326,34 @@ func newHandler(cfg Config, pool []*http.Transport) http.Handler {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
id, err := pickProxyID(r, count)
|
||||
pin, err := parsePin(r, count)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
// Reserve a slot. A pinned slot that is mid-rotation is rejected (the
|
||||
// caller asked for that specific egress); a random pick just skips
|
||||
// draining/warming slots. release() runs after the response finishes
|
||||
// streaming so drain() can observe a true in-flight count.
|
||||
var id int
|
||||
if pin >= 0 {
|
||||
id = pin
|
||||
if err := mgr.acquirePinned(id); err != nil {
|
||||
w.Header().Set("Retry-After", "5")
|
||||
http.Error(w, err.Error(), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
id, err = mgr.acquireAny()
|
||||
if err != nil {
|
||||
w.Header().Set("Retry-After", "5")
|
||||
http.Error(w, err.Error(), http.StatusServiceUnavailable)
|
||||
return
|
||||
}
|
||||
}
|
||||
defer mgr.release(id)
|
||||
|
||||
ctx := context.WithValue(r.Context(), ctxKey{}, reqCtx{
|
||||
proxyID: id,
|
||||
target: target,
|
||||
@@ -320,9 +373,40 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("transports: %v", err)
|
||||
}
|
||||
mgr := newProxyManager(len(pool), cfg.DrainSettle, cfg.DrainWarmup, cfg.DrainMaxHold)
|
||||
|
||||
// Background egress-IP prober feeds /status. A slot is re-probed shortly
|
||||
// after it rotates back in so its new egress IP appears promptly rather
|
||||
// than waiting for the next periodic pass.
|
||||
if cfg.ProbeInterval > 0 {
|
||||
pr := &prober{mgr: mgr, pool: pool, url: cfg.ProbeURL, timeout: cfg.ProbeTimeout}
|
||||
mgr.afterUndrain = func(id int) {
|
||||
time.AfterFunc(cfg.DrainWarmup+500*time.Millisecond, func() { pr.probe(id) })
|
||||
}
|
||||
go pr.run(context.Background(), cfg.ProbeInterval)
|
||||
log.Printf("egress prober: %s every %s", cfg.ProbeURL, cfg.ProbeInterval)
|
||||
}
|
||||
|
||||
// Admin server: localhost-only by default so drain/undrain is reachable by
|
||||
// the rotation tooling running in the same LXC but never from the public
|
||||
// reverse proxy. Empty WEIRCON_ADMIN_LISTEN disables it entirely.
|
||||
if cfg.AdminListen != "" {
|
||||
admin := &http.Server{
|
||||
Addr: cfg.AdminListen,
|
||||
Handler: newAdminHandler(cfg, mgr),
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
}
|
||||
go func() {
|
||||
log.Printf("admin listening on %s", cfg.AdminListen)
|
||||
if err := admin.ListenAndServe(); err != nil {
|
||||
log.Fatalf("admin server: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: cfg.Listen,
|
||||
Handler: newHandler(cfg, pool),
|
||||
Handler: newHandler(cfg, pool, mgr),
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
IdleTimeout: 120 * time.Second,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,194 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand/v2"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// errDraining is returned when a request pins a slot that is currently being
|
||||
// rotated out. errNoProxies is returned when no slot is selectable at all.
|
||||
var (
|
||||
errDraining = errors.New("proxy is draining for rotation")
|
||||
errNoProxies = errors.New("no proxy currently available")
|
||||
)
|
||||
|
||||
// proxyManager tracks per-slot availability so the rotation tooling can take a
|
||||
// slot out of service a moment before its WireGuard tunnel is torn down and
|
||||
// bring it back a moment after the new tunnel is up — without ever handing an
|
||||
// in-flight request to a slot that is mid-rotation.
|
||||
//
|
||||
// Lifecycle of a rotation (driven by rotate.sh via the admin server):
|
||||
//
|
||||
// drain(id) -> stop routing NEW requests to id, block until in-flight
|
||||
// drains to zero (bounded by timeout) + a short settle.
|
||||
// (restart the tunnel unit)
|
||||
// undrain(id) -> mark id available again, but keep it out of the candidate
|
||||
// set for `warmup` so the fresh handshake can settle first.
|
||||
//
|
||||
// A drained slot self-heals: if undrain never arrives (e.g. rotate.sh died
|
||||
// mid-run) the slot returns to service after maxHold so the pool can't shrink
|
||||
// permanently.
|
||||
type proxyManager struct {
|
||||
mu sync.Mutex
|
||||
n int
|
||||
inflight []int
|
||||
draining []bool
|
||||
warmupAt []time.Time // slot not selectable until now >= warmupAt[id]
|
||||
healTmr []*time.Timer
|
||||
egress []egressInfo // last observed egress IP per slot (from the prober)
|
||||
|
||||
// Rotation cadence, inferred from drain() calls. rotate.sh rotates exactly
|
||||
// one slot per timer tick in round-robin order, so the gap between drains is
|
||||
// the rotation interval and the next slot to rotate is (lastSlot+1) % n.
|
||||
lastSlot int
|
||||
lastRotated time.Time
|
||||
interval time.Duration
|
||||
|
||||
// afterUndrain, if set, is called (outside the lock) when a slot returns to
|
||||
// service — used by the prober to refresh that slot's egress IP promptly.
|
||||
afterUndrain func(id int)
|
||||
|
||||
settle time.Duration // quiet period after in-flight hits zero (the "before")
|
||||
warmup time.Duration // post-undrain cool-in before re-selection (the "after")
|
||||
maxHold time.Duration // safety: auto-undrain if rotate.sh never calls undrain
|
||||
}
|
||||
|
||||
// egressInfo is the prober's last view of a slot's public egress.
|
||||
type egressInfo struct {
|
||||
ip string
|
||||
latency time.Duration
|
||||
ok bool
|
||||
checkedAt time.Time
|
||||
}
|
||||
|
||||
func newProxyManager(n int, settle, warmup, maxHold time.Duration) *proxyManager {
|
||||
return &proxyManager{
|
||||
n: n,
|
||||
inflight: make([]int, n),
|
||||
draining: make([]bool, n),
|
||||
warmupAt: make([]time.Time, n),
|
||||
healTmr: make([]*time.Timer, n),
|
||||
egress: make([]egressInfo, n),
|
||||
lastSlot: -1,
|
||||
settle: settle,
|
||||
warmup: warmup,
|
||||
maxHold: maxHold,
|
||||
}
|
||||
}
|
||||
|
||||
// setEgress records the prober's latest observation for a slot.
|
||||
func (m *proxyManager) setEgress(id int, ip string, latency time.Duration, ok bool) {
|
||||
m.mu.Lock()
|
||||
m.egress[id] = egressInfo{ip: ip, latency: latency, ok: ok, checkedAt: time.Now()}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// selectableLocked reports whether slot id may take a new request right now.
|
||||
// Caller must hold m.mu.
|
||||
func (m *proxyManager) selectableLocked(id int) bool {
|
||||
if m.draining[id] {
|
||||
return false
|
||||
}
|
||||
if w := m.warmupAt[id]; !w.IsZero() && time.Now().Before(w) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// acquirePinned reserves a specific slot for one request. It combines the
|
||||
// availability check and the in-flight increment under the lock so a drain
|
||||
// can't slip in between. Call release(id) when the request finishes.
|
||||
func (m *proxyManager) acquirePinned(id int) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if !m.selectableLocked(id) {
|
||||
return errDraining
|
||||
}
|
||||
m.inflight[id]++
|
||||
return nil
|
||||
}
|
||||
|
||||
// acquireAny picks a random selectable slot and reserves it. Returns errNoProxies
|
||||
// if every slot is draining/warming. Call release(id) when the request finishes.
|
||||
func (m *proxyManager) acquireAny() (int, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
cands := make([]int, 0, m.n)
|
||||
for i := 0; i < m.n; i++ {
|
||||
if m.selectableLocked(i) {
|
||||
cands = append(cands, i)
|
||||
}
|
||||
}
|
||||
if len(cands) == 0 {
|
||||
return 0, errNoProxies
|
||||
}
|
||||
id := cands[rand.IntN(len(cands))]
|
||||
m.inflight[id]++
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// release marks one in-flight request on id as finished.
|
||||
func (m *proxyManager) release(id int) {
|
||||
m.mu.Lock()
|
||||
if m.inflight[id] > 0 {
|
||||
m.inflight[id]--
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// drain stops new traffic to id and waits until in-flight reaches zero (bounded
|
||||
// by timeout) followed by a settle period. Returns the number of requests still
|
||||
// in flight when it gave up (0 on a clean drain). A self-heal timer is armed so
|
||||
// the slot recovers even if undrain is never called.
|
||||
func (m *proxyManager) drain(id int, timeout time.Duration) int {
|
||||
m.mu.Lock()
|
||||
m.draining[id] = true
|
||||
// Infer rotation cadence: the gap since the previous drain is the interval.
|
||||
now := time.Now()
|
||||
if !m.lastRotated.IsZero() {
|
||||
if d := now.Sub(m.lastRotated); d > 0 {
|
||||
m.interval = d
|
||||
}
|
||||
}
|
||||
m.lastSlot = id
|
||||
m.lastRotated = now
|
||||
if m.healTmr[id] != nil {
|
||||
m.healTmr[id].Stop()
|
||||
}
|
||||
m.healTmr[id] = time.AfterFunc(m.maxHold, func() { m.undrain(id) })
|
||||
m.mu.Unlock()
|
||||
|
||||
deadline := time.Now().Add(timeout)
|
||||
for {
|
||||
m.mu.Lock()
|
||||
inf := m.inflight[id]
|
||||
m.mu.Unlock()
|
||||
if inf == 0 {
|
||||
time.Sleep(m.settle) // let the last connection's FIN flush
|
||||
return 0
|
||||
}
|
||||
if time.Now().After(deadline) {
|
||||
return inf
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
// undrain returns id to service after a warmup window during which it stays out
|
||||
// of the candidate set (so the just-rebuilt tunnel's handshake can complete).
|
||||
func (m *proxyManager) undrain(id int) {
|
||||
m.mu.Lock()
|
||||
if m.healTmr[id] != nil {
|
||||
m.healTmr[id].Stop()
|
||||
m.healTmr[id] = nil
|
||||
}
|
||||
m.draining[id] = false
|
||||
m.warmupAt[id] = time.Now().Add(m.warmup)
|
||||
hook := m.afterUndrain
|
||||
m.mu.Unlock()
|
||||
if hook != nil {
|
||||
hook(id)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,176 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func newTestMgr(n int) *proxyManager {
|
||||
// Zero settle/warmup so tests don't sleep; tiny maxHold disabled by being long.
|
||||
return newProxyManager(n, 0, 0, time.Hour)
|
||||
}
|
||||
|
||||
func TestAcquireAnySkipsDraining(t *testing.T) {
|
||||
m := newTestMgr(3)
|
||||
m.drain(0, time.Second) // slot 0 out
|
||||
m.drain(1, time.Second) // slot 1 out
|
||||
|
||||
// Only slot 2 should ever come back from acquireAny.
|
||||
for i := 0; i < 50; i++ {
|
||||
id, err := m.acquireAny()
|
||||
if err != nil {
|
||||
t.Fatalf("acquireAny: %v", err)
|
||||
}
|
||||
if id != 2 {
|
||||
t.Fatalf("expected only slot 2 selectable, got %d", id)
|
||||
}
|
||||
m.release(id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAcquireAnyNoneAvailable(t *testing.T) {
|
||||
m := newTestMgr(2)
|
||||
m.drain(0, time.Second)
|
||||
m.drain(1, time.Second)
|
||||
if _, err := m.acquireAny(); err != errNoProxies {
|
||||
t.Fatalf("expected errNoProxies, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAcquirePinnedDraining(t *testing.T) {
|
||||
m := newTestMgr(2)
|
||||
if err := m.acquirePinned(1); err != nil {
|
||||
t.Fatalf("healthy slot should acquire: %v", err)
|
||||
}
|
||||
m.release(1)
|
||||
|
||||
m.drain(1, time.Second)
|
||||
if err := m.acquirePinned(1); err != errDraining {
|
||||
t.Fatalf("expected errDraining for pinned drained slot, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDrainWaitsForInflight(t *testing.T) {
|
||||
m := newTestMgr(1)
|
||||
if _, err := m.acquireAny(); err != nil { // hold slot 0 in flight
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
done := make(chan int, 1)
|
||||
go func() { done <- m.drain(0, 2*time.Second) }()
|
||||
|
||||
// drain must not return while the request is still in flight.
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatal("drain returned before in-flight request released")
|
||||
case <-time.After(150 * time.Millisecond):
|
||||
}
|
||||
|
||||
m.release(0)
|
||||
select {
|
||||
case residual := <-done:
|
||||
if residual != 0 {
|
||||
t.Fatalf("expected clean drain (0 residual), got %d", residual)
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("drain did not return after release")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDrainTimeoutReportsResidual(t *testing.T) {
|
||||
m := newTestMgr(1)
|
||||
if _, err := m.acquireAny(); err != nil { // never released
|
||||
t.Fatal(err)
|
||||
}
|
||||
if residual := m.drain(0, 100*time.Millisecond); residual != 1 {
|
||||
t.Fatalf("expected residual 1 on timeout, got %d", residual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUndrainRestoresSelection(t *testing.T) {
|
||||
m := newTestMgr(1)
|
||||
m.drain(0, time.Second)
|
||||
if _, err := m.acquireAny(); err != errNoProxies {
|
||||
t.Fatalf("expected slot unavailable while draining, got %v", err)
|
||||
}
|
||||
m.undrain(0) // warmup is zero in test mgr → immediately selectable
|
||||
if _, err := m.acquireAny(); err != nil {
|
||||
t.Fatalf("expected slot available after undrain, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWarmupHoldsSlotOut(t *testing.T) {
|
||||
m := newProxyManager(1, 0, 200*time.Millisecond, time.Hour)
|
||||
m.undrain(0) // sets warmupAt = now + 200ms
|
||||
if _, err := m.acquireAny(); err != errNoProxies {
|
||||
t.Fatalf("expected slot held out during warmup, got %v", err)
|
||||
}
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
if _, err := m.acquireAny(); err != nil {
|
||||
t.Fatalf("expected slot available after warmup, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSelfHealOnMissedUndrain(t *testing.T) {
|
||||
m := newProxyManager(1, 0, 0, 100*time.Millisecond) // maxHold 100ms
|
||||
m.drain(0, time.Second)
|
||||
if _, err := m.acquireAny(); err != errNoProxies {
|
||||
t.Fatalf("expected unavailable right after drain, got %v", err)
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond) // self-heal timer should fire
|
||||
if _, err := m.acquireAny(); err != nil {
|
||||
t.Fatalf("expected self-heal to restore slot, got %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSnapshotEgressAndRotation(t *testing.T) {
|
||||
m := newProxyManager(3, 0, 0, time.Hour)
|
||||
m.setEgress(2, "9.9.9.9", 12*time.Millisecond, true)
|
||||
|
||||
m.drain(0, time.Second) // first rotation: establishes lastRotated only
|
||||
m.undrain(0)
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
m.drain(1, time.Second) // second rotation: gap becomes the interval; slot 1 left draining
|
||||
|
||||
snap := m.snapshot()
|
||||
|
||||
if got := snap.Proxies[2]; got.EgressIP != "9.9.9.9" || !got.Reachable || got.CheckedAgeSec == nil {
|
||||
t.Fatalf("egress not reflected on slot 2: %+v", got)
|
||||
}
|
||||
if snap.Proxies[1].State != "draining" {
|
||||
t.Fatalf("slot 1 should be draining, got %q", snap.Proxies[1].State)
|
||||
}
|
||||
if snap.Rotation.LastSlot == nil || *snap.Rotation.LastSlot != 1 {
|
||||
t.Fatalf("last_slot want 1, got %v", snap.Rotation.LastSlot)
|
||||
}
|
||||
if snap.Rotation.NextSlot == nil || *snap.Rotation.NextSlot != 2 {
|
||||
t.Fatalf("next_slot want 2 (round-robin), got %v", snap.Rotation.NextSlot)
|
||||
}
|
||||
if snap.Rotation.NextEgressIP != "9.9.9.9" {
|
||||
t.Fatalf("next_egress_ip want 9.9.9.9 (slot 2's IP), got %q", snap.Rotation.NextEgressIP)
|
||||
}
|
||||
if snap.Rotation.ETASec == nil {
|
||||
t.Fatal("expected an ETA once cadence is known")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentAcquireRelease(t *testing.T) {
|
||||
m := newTestMgr(4)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if id, err := m.acquireAny(); err == nil {
|
||||
m.release(id)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
for i := 0; i < 4; i++ {
|
||||
if m.inflight[i] != 0 {
|
||||
t.Fatalf("slot %d leaked in-flight count: %d", i, m.inflight[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,167 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// ProxyStatus is one slot's entry in the /status snapshot.
|
||||
type ProxyStatus struct {
|
||||
ID int `json:"id"`
|
||||
State string `json:"state"` // "live" | "draining" | "warming"
|
||||
Inflight int `json:"inflight"`
|
||||
// Egress fields are populated by the background prober (omitted until the
|
||||
// first successful probe, or when probing is disabled).
|
||||
EgressIP string `json:"egress_ip,omitempty"`
|
||||
Reachable bool `json:"reachable"`
|
||||
LatencyMS int64 `json:"latency_ms,omitempty"`
|
||||
CheckedAgeSec *int64 `json:"checked_age_sec,omitempty"`
|
||||
}
|
||||
|
||||
// RotationStatus describes when the next rotation is due and which egress it
|
||||
// will replace. All fields are inferred from observed drain timing, so they
|
||||
// are nil until enough rotations have happened to establish the cadence.
|
||||
type RotationStatus struct {
|
||||
NextSlot *int `json:"next_slot,omitempty"`
|
||||
NextEgressIP string `json:"next_egress_ip,omitempty"`
|
||||
ETASec *int64 `json:"eta_sec,omitempty"`
|
||||
IntervalSec *int64 `json:"interval_sec,omitempty"`
|
||||
LastSlot *int `json:"last_slot,omitempty"`
|
||||
LastRotatedAgeSec *int64 `json:"last_rotated_age_sec,omitempty"`
|
||||
}
|
||||
|
||||
// StatusSnapshot is the JSON returned by /status — cheap to produce, safe to
|
||||
// poll every second.
|
||||
type StatusSnapshot struct {
|
||||
Now string `json:"now"`
|
||||
ProxiesTotal int `json:"proxies_total"`
|
||||
Available int `json:"available"`
|
||||
Proxies []ProxyStatus `json:"proxies"`
|
||||
Rotation RotationStatus `json:"rotation"`
|
||||
}
|
||||
|
||||
// snapshot builds the current status under the lock. No network I/O happens
|
||||
// here — egress data is whatever the prober last cached.
|
||||
func (m *proxyManager) snapshot() StatusSnapshot {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
now := time.Now()
|
||||
|
||||
snap := StatusSnapshot{
|
||||
Now: now.UTC().Format(time.RFC3339),
|
||||
ProxiesTotal: m.n,
|
||||
Proxies: make([]ProxyStatus, m.n),
|
||||
}
|
||||
for i := 0; i < m.n; i++ {
|
||||
state := "live"
|
||||
switch {
|
||||
case m.draining[i]:
|
||||
state = "draining"
|
||||
case !m.warmupAt[i].IsZero() && now.Before(m.warmupAt[i]):
|
||||
state = "warming"
|
||||
default:
|
||||
snap.Available++
|
||||
}
|
||||
ps := ProxyStatus{ID: i, State: state, Inflight: m.inflight[i]}
|
||||
if e := m.egress[i]; !e.checkedAt.IsZero() {
|
||||
ps.EgressIP = e.ip
|
||||
ps.Reachable = e.ok
|
||||
ps.LatencyMS = e.latency.Milliseconds()
|
||||
age := int64(now.Sub(e.checkedAt).Seconds())
|
||||
ps.CheckedAgeSec = &age
|
||||
}
|
||||
snap.Proxies[i] = ps
|
||||
}
|
||||
|
||||
if !m.lastRotated.IsZero() {
|
||||
last := m.lastSlot
|
||||
snap.Rotation.LastSlot = &last
|
||||
lra := int64(now.Sub(m.lastRotated).Seconds())
|
||||
snap.Rotation.LastRotatedAgeSec = &lra
|
||||
if m.interval > 0 {
|
||||
next := (m.lastSlot + 1) % m.n
|
||||
isec := int64(m.interval.Seconds())
|
||||
eta := int64(m.lastRotated.Add(m.interval).Sub(now).Seconds())
|
||||
if eta < 0 {
|
||||
eta = 0
|
||||
}
|
||||
snap.Rotation.NextSlot = &next
|
||||
snap.Rotation.NextEgressIP = m.egress[next].ip
|
||||
snap.Rotation.IntervalSec = &isec
|
||||
snap.Rotation.ETASec = &eta
|
||||
}
|
||||
}
|
||||
return snap
|
||||
}
|
||||
|
||||
// prober periodically fetches each live slot's public egress IP through its own
|
||||
// SOCKS transport and caches it on the manager for /status to report.
|
||||
type prober struct {
|
||||
mgr *proxyManager
|
||||
pool []*http.Transport
|
||||
url string
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// probe fetches the egress IP for a single slot. A non-2xx response or a body
|
||||
// that isn't a valid IP is recorded as unreachable.
|
||||
func (p *prober) probe(id int) {
|
||||
client := &http.Client{Transport: p.pool[id], Timeout: p.timeout}
|
||||
req, err := http.NewRequest(http.MethodGet, p.url, nil)
|
||||
if err != nil {
|
||||
p.mgr.setEgress(id, "", 0, false)
|
||||
return
|
||||
}
|
||||
start := time.Now()
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
p.mgr.setEgress(id, "", 0, false)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 64))
|
||||
ip := string(body)
|
||||
for len(ip) > 0 && (ip[len(ip)-1] == '\n' || ip[len(ip)-1] == '\r' || ip[len(ip)-1] == ' ') {
|
||||
ip = ip[:len(ip)-1]
|
||||
}
|
||||
ok := resp.StatusCode == http.StatusOK && net.ParseIP(ip) != nil
|
||||
if !ok {
|
||||
ip = ""
|
||||
}
|
||||
p.mgr.setEgress(id, ip, time.Since(start), ok)
|
||||
}
|
||||
|
||||
// probeAll probes every currently selectable slot concurrently (draining and
|
||||
// warming slots are skipped — their egress is mid-change).
|
||||
func (p *prober) probeAll() {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < p.mgr.n; i++ {
|
||||
p.mgr.mu.Lock()
|
||||
sel := p.mgr.selectableLocked(i)
|
||||
p.mgr.mu.Unlock()
|
||||
if !sel {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(id int) { defer wg.Done(); p.probe(id) }(i)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (p *prober) run(ctx context.Context, interval time.Duration) {
|
||||
p.probeAll()
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
p.probeAll()
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user