mirror of
https://github.com/yu-i-i/overleaf-cep.git
synced 2026-05-23 17:19:37 +02:00
[web] apply runtime improvements to the finalization script (#31360)
* extract RateLimiter * remove unnecessary remapping and wrappers GitOrigin-RevId: fda1cdefa15f2f3fa9a042346a5ba4243897b90a
This commit is contained in:
@@ -91,7 +91,6 @@ import minimist from 'minimist'
|
||||
import PQueue from 'p-queue'
|
||||
import fs from 'node:fs'
|
||||
import * as csv from 'csv'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import { scriptRunner } from '../lib/ScriptRunner.mjs'
|
||||
|
||||
import {
|
||||
@@ -101,6 +100,15 @@ import {
|
||||
coalesceOrThrowVATNumber,
|
||||
getTaxIdType,
|
||||
} from '../helpers/migrate_recurly_customers_to_stripe.helpers.mjs'
|
||||
import {
|
||||
createRateLimitedApiWrappers,
|
||||
DEFAULT_RECURLY_RATE_LIMIT,
|
||||
DEFAULT_STRIPE_RATE_LIMIT,
|
||||
DEFAULT_RECURLY_API_RETRIES,
|
||||
DEFAULT_RECURLY_RETRY_DELAY_MS,
|
||||
DEFAULT_STRIPE_API_RETRIES,
|
||||
DEFAULT_STRIPE_RETRY_DELAY_MS,
|
||||
} from '../stripe/RateLimiter.mjs'
|
||||
|
||||
// =============================================================================
|
||||
// STRIPE CLIENT SETUP
|
||||
@@ -148,12 +156,16 @@ function getRegionClient(region) {
|
||||
)
|
||||
}
|
||||
|
||||
stripeClients[regionLower] = new Stripe(secretKey, {
|
||||
const client = new Stripe(secretKey, {
|
||||
httpClient: Stripe.createFetchHttpClient(),
|
||||
telemetry: false,
|
||||
})
|
||||
|
||||
return stripeClients[regionLower]
|
||||
// Add serviceName for rate limiter identification (stripe-us or stripe-uk)
|
||||
client.serviceName = `stripe-${regionLower}`
|
||||
|
||||
stripeClients[regionLower] = client
|
||||
return client
|
||||
}
|
||||
|
||||
// =============================================================================
|
||||
@@ -527,259 +539,8 @@ function createJsonArrayWriter(jsonPath) {
|
||||
// RATE LIMITING
|
||||
// =============================================================================
|
||||
|
||||
/**
|
||||
* Rate limiter using sliding window algorithm.
|
||||
*
|
||||
* Rate limits (conservative targets, leaving headroom):
|
||||
* - Recurly: 2000 requests per 5 minutes → target 1500/5min = 300/min = 5/sec
|
||||
* https://support.recurly.com/hc/en-us/articles/360034160731-What-Are-Recurly-s-API-Rate-Limits
|
||||
* - Stripe: 100 requests per second → target 50/sec (plenty of headroom)
|
||||
* https://docs.stripe.com/rate-limits
|
||||
*
|
||||
* Recurly is the bottleneck. With 2 Recurly calls per customer (getAccount, getBillingInfo),
|
||||
* we can process ~2.5 customers/second = ~150 customers/minute = ~9000 customers/hour.
|
||||
* For 150K customers, expect ~17 hours at full throughput.
|
||||
*/
|
||||
|
||||
class RateLimiter {
|
||||
/**
|
||||
* @param {string} name - Name for logging
|
||||
* @param {number} maxRequests - Maximum requests allowed in the window
|
||||
* @param {number} windowMs - Window size in milliseconds
|
||||
*/
|
||||
constructor(name, maxRequests, windowMs) {
|
||||
this.name = name
|
||||
this.maxRequests = maxRequests
|
||||
this.windowMs = windowMs
|
||||
this.requests = [] // timestamps of recent requests
|
||||
this.totalRequests = 0
|
||||
this._pending = Promise.resolve()
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait if necessary to stay within rate limits, then record the request.
|
||||
*/
|
||||
async throttle() {
|
||||
this._pending = this._pending
|
||||
.catch(error => {
|
||||
// this should never happen since setTimeout or logDebug are very unlikely to ever fail
|
||||
// but if it does, we log it and continue without blocking the queue (fail-open)
|
||||
logWarn(`Rate limiter chain error for ${this.name}`, {
|
||||
error: error?.message || String(error),
|
||||
})
|
||||
})
|
||||
.then(async () => {
|
||||
while (true) {
|
||||
const now = Date.now()
|
||||
|
||||
// Remove requests outside the window
|
||||
const windowStart = now - this.windowMs
|
||||
this.requests = this.requests.filter(ts => ts > windowStart)
|
||||
|
||||
// If at limit, wait until the oldest request exits the window
|
||||
if (this.requests.length >= this.maxRequests) {
|
||||
const oldestRequest = this.requests[0]
|
||||
const waitTime = oldestRequest - windowStart + 1
|
||||
if (waitTime > 0) {
|
||||
logDebug(
|
||||
`Rate limit throttle for ${this.name}`,
|
||||
{
|
||||
waitMs: waitTime,
|
||||
currentRequests: this.requests.length,
|
||||
maxRequests: this.maxRequests,
|
||||
},
|
||||
{ verboseOnly: true }
|
||||
)
|
||||
await setTimeout(waitTime)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Record this request
|
||||
this.requests.push(Date.now())
|
||||
this.totalRequests++
|
||||
break
|
||||
}
|
||||
})
|
||||
|
||||
return this._pending
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current rate (requests per second over the last window)
|
||||
*/
|
||||
getCurrentRate() {
|
||||
const now = Date.now()
|
||||
const windowStart = now - this.windowMs
|
||||
const recentRequests = this.requests.filter(ts => ts > windowStart).length
|
||||
return (recentRequests / this.windowMs) * 1000 // requests per second
|
||||
}
|
||||
|
||||
getStats() {
|
||||
return {
|
||||
name: this.name,
|
||||
totalRequests: this.totalRequests,
|
||||
currentWindowRequests: this.requests.length,
|
||||
maxRequests: this.maxRequests,
|
||||
currentRate: this.getCurrentRate().toFixed(2) + '/sec',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const DEFAULT_RECURLY_RATE_LIMIT = 10 // requests per second
|
||||
const DEFAULT_STRIPE_RATE_LIMIT = 50 // requests per second
|
||||
const DEFAULT_RECURLY_API_RETRIES = 5
|
||||
const DEFAULT_RECURLY_RETRY_DELAY_MS = 1000
|
||||
const DEFAULT_STRIPE_API_RETRIES = 5
|
||||
const DEFAULT_STRIPE_RETRY_DELAY_MS = 1000
|
||||
const RATE_LIMIT_WINDOW_MS = 1000
|
||||
|
||||
let recurlyRateLimiter
|
||||
let recurlyApiRetries = DEFAULT_RECURLY_API_RETRIES
|
||||
let recurlyRetryDelayMs = DEFAULT_RECURLY_RETRY_DELAY_MS
|
||||
let stripeRateLimitPerSecond = DEFAULT_STRIPE_RATE_LIMIT
|
||||
let stripeApiRetries = DEFAULT_STRIPE_API_RETRIES
|
||||
let stripeRetryDelayMs = DEFAULT_STRIPE_RETRY_DELAY_MS
|
||||
const stripeRateLimiters = new Map()
|
||||
|
||||
function getStripeRateLimiter(region) {
|
||||
const key = String(region || 'unknown').toLowerCase()
|
||||
if (stripeRateLimiters.has(key)) return stripeRateLimiters.get(key)
|
||||
|
||||
const limiter = new RateLimiter(
|
||||
`Stripe-${key}`,
|
||||
stripeRateLimitPerSecond,
|
||||
RATE_LIMIT_WINDOW_MS
|
||||
)
|
||||
stripeRateLimiters.set(key, limiter)
|
||||
return limiter
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle before making a Recurly API call
|
||||
*/
|
||||
async function throttleRecurly() {
|
||||
await recurlyRateLimiter.throttle()
|
||||
}
|
||||
|
||||
async function recurlyRequestWithRetries(operation, { context } = {}) {
|
||||
let attempt = 0
|
||||
while (true) {
|
||||
try {
|
||||
return await operation()
|
||||
} catch (error) {
|
||||
const statusCode =
|
||||
error?.statusCode ?? error?.status ?? error?.raw?.statusCode
|
||||
if (statusCode === 429) {
|
||||
logWarn('Recurly rate limited', {
|
||||
rowNumber: context?.rowNumber,
|
||||
recurlyAccountCode: context?.recurlyAccountCode,
|
||||
attempt: attempt + 1,
|
||||
maxRetries: recurlyApiRetries,
|
||||
})
|
||||
|
||||
if (attempt < recurlyApiRetries) {
|
||||
attempt++
|
||||
await setTimeout(recurlyRetryDelayMs)
|
||||
continue
|
||||
}
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function recurlyCall(operation, context) {
|
||||
return recurlyRequestWithRetries(
|
||||
async () => {
|
||||
await throttleRecurly()
|
||||
return operation()
|
||||
},
|
||||
{ context }
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Throttle before making a Stripe API call
|
||||
*/
|
||||
async function throttleStripe(region) {
|
||||
await getStripeRateLimiter(region).throttle()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rate limiter statistics for logging
|
||||
*/
|
||||
function getRateLimiterStats() {
|
||||
const stripeLimiters = [...stripeRateLimiters.values()]
|
||||
const stripeTotalRequests = stripeLimiters.reduce(
|
||||
(sum, limiter) => sum + limiter.totalRequests,
|
||||
0
|
||||
)
|
||||
const stripeCurrentRate = stripeLimiters.reduce(
|
||||
(sum, limiter) => sum + limiter.getCurrentRate(),
|
||||
0
|
||||
)
|
||||
|
||||
return {
|
||||
recurly: recurlyRateLimiter.getStats(),
|
||||
stripe: {
|
||||
totalRequests: stripeTotalRequests,
|
||||
currentRate: stripeCurrentRate.toFixed(2) + '/sec',
|
||||
},
|
||||
stripeByRegion: stripeLimiters.map(limiter => limiter.getStats()),
|
||||
}
|
||||
}
|
||||
|
||||
function getStripeRateLimitReason(error) {
|
||||
const headers =
|
||||
error?.headers || error?.raw?.headers || error?.response?.headers || {}
|
||||
return (
|
||||
headers['stripe-rate-limit-reason'] ||
|
||||
headers['Stripe-Rate-Limited-Reason'] ||
|
||||
headers['stripe-rate-limited-reason'] ||
|
||||
null
|
||||
)
|
||||
}
|
||||
|
||||
async function stripeRequestWithRetries(operation, { context } = {}) {
|
||||
let attempt = 0
|
||||
while (true) {
|
||||
try {
|
||||
return await operation()
|
||||
} catch (error) {
|
||||
const statusCode = error?.statusCode ?? error?.raw?.statusCode
|
||||
if (statusCode === 429) {
|
||||
const reason = getStripeRateLimitReason(error)
|
||||
logWarn('Stripe rate limited', {
|
||||
rowNumber: context?.rowNumber,
|
||||
stripeCustomerId: context?.stripeCustomerId,
|
||||
stripeAccount: context?.stripeAccount,
|
||||
reason,
|
||||
stripeApi: context?.stripeApi,
|
||||
attempt: attempt + 1,
|
||||
maxRetries: stripeApiRetries,
|
||||
})
|
||||
|
||||
if (attempt < stripeApiRetries) {
|
||||
attempt++
|
||||
await setTimeout(stripeRetryDelayMs)
|
||||
continue
|
||||
}
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function stripeCall(region, operation, context) {
|
||||
return stripeRequestWithRetries(
|
||||
async () => {
|
||||
await throttleStripe(region)
|
||||
return operation()
|
||||
},
|
||||
{ context }
|
||||
)
|
||||
}
|
||||
// rate limiters - initialized in main()
|
||||
let rateLimiters
|
||||
|
||||
// =============================================================================
|
||||
// DATA TRANSFORMATION
|
||||
@@ -792,14 +553,16 @@ async function stripeCall(region, operation, context) {
|
||||
* @returns {Promise<{account: object, billingInfo: object|null}>}
|
||||
*/
|
||||
async function fetchRecurlyData(accountCode, context) {
|
||||
const account = await recurlyCall(
|
||||
const account = await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() => recurlyClient.getAccount(`code-${accountCode}`),
|
||||
context
|
||||
)
|
||||
|
||||
let billingInfo = null
|
||||
try {
|
||||
billingInfo = await recurlyCall(
|
||||
billingInfo = await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() => recurlyClient.getBillingInfo(`code-${accountCode}`),
|
||||
context
|
||||
)
|
||||
@@ -826,12 +589,10 @@ async function fetchRecurlyData(accountCode, context) {
|
||||
async function fetchTargetStripeCustomer(
|
||||
stripeClient,
|
||||
stripeCustomerId,
|
||||
region,
|
||||
context
|
||||
) {
|
||||
// TODO: consider getting the region from stripeClient.serviceName
|
||||
const customer = await stripeCall(
|
||||
region,
|
||||
const customer = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.customers.retrieve(stripeCustomerId),
|
||||
{ ...context, stripeApi: 'customers.retrieve' }
|
||||
)
|
||||
@@ -853,11 +614,10 @@ async function fetchTargetStripeCustomer(
|
||||
async function fetchTargetStripeCustomerPaymentMethods(
|
||||
stripeClient,
|
||||
stripeCustomerId,
|
||||
region,
|
||||
context
|
||||
) {
|
||||
const paymentMethods = await stripeCall(
|
||||
region,
|
||||
const paymentMethods = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.customers.listPaymentMethods(stripeCustomerId),
|
||||
{ ...context, stripeApi: 'customers.listPaymentMethods' }
|
||||
)
|
||||
@@ -874,8 +634,7 @@ async function replaceCustomerTaxIds(
|
||||
stripeClient,
|
||||
stripeCustomerId,
|
||||
{ taxIdType, vatNumber },
|
||||
context,
|
||||
region
|
||||
context
|
||||
) {
|
||||
// Stripe customers can have multiple tax IDs. For this migration, we want a single
|
||||
// authoritative tax ID derived from Recurly, so we remove any existing ones first.
|
||||
@@ -883,8 +642,8 @@ async function replaceCustomerTaxIds(
|
||||
|
||||
let startingAfter
|
||||
while (true) {
|
||||
const page = await stripeCall(
|
||||
region,
|
||||
const page = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() =>
|
||||
stripeClient.customers.listTaxIds(stripeCustomerId, {
|
||||
limit: 100,
|
||||
@@ -910,16 +669,16 @@ async function replaceCustomerTaxIds(
|
||||
)
|
||||
|
||||
for (const taxId of existingTaxIds) {
|
||||
await stripeCall(
|
||||
region,
|
||||
await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.customers.deleteTaxId(stripeCustomerId, taxId.id),
|
||||
{ ...context, stripeApi: 'customers.deleteTaxId' }
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return await stripeCall(
|
||||
region,
|
||||
return await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() =>
|
||||
stripeClient.customers.createTaxId(stripeCustomerId, {
|
||||
type: taxIdType,
|
||||
@@ -1151,7 +910,6 @@ async function processCustomer(
|
||||
const existingCustomer = await fetchTargetStripeCustomer(
|
||||
stripeClient,
|
||||
stripeCustomerId,
|
||||
region,
|
||||
stripeContext
|
||||
)
|
||||
|
||||
@@ -1236,8 +994,7 @@ async function processCustomer(
|
||||
stripeClient,
|
||||
stripeCustomerId,
|
||||
{ taxIdType, vatNumber },
|
||||
context,
|
||||
region
|
||||
context
|
||||
)
|
||||
logDebug(
|
||||
'Successfully created tax ID',
|
||||
@@ -1426,8 +1183,8 @@ async function processCustomer(
|
||||
},
|
||||
{ verboseOnly: true }
|
||||
)
|
||||
await stripeCall(
|
||||
region,
|
||||
await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.customers.update(stripeCustomerId, customerParams),
|
||||
{ ...stripeContext, stripeApi: 'customers.update' }
|
||||
)
|
||||
@@ -1694,6 +1451,7 @@ async function main(trackProgress) {
|
||||
let recurlyRateLimit
|
||||
let recurlyApiRetriesValue
|
||||
let recurlyRetryDelayMsValue
|
||||
let stripeRateLimitPerSecond
|
||||
let stripeApiRetriesValue
|
||||
let stripeRetryDelayMsValue
|
||||
let limit
|
||||
@@ -1733,16 +1491,17 @@ async function main(trackProgress) {
|
||||
process.exit(1)
|
||||
}
|
||||
|
||||
recurlyRateLimiter = new RateLimiter(
|
||||
'Recurly',
|
||||
// initialize rate limiters
|
||||
rateLimiters = createRateLimitedApiWrappers({
|
||||
recurlyRateLimit,
|
||||
RATE_LIMIT_WINDOW_MS
|
||||
)
|
||||
recurlyApiRetries = recurlyApiRetriesValue
|
||||
recurlyRetryDelayMs = recurlyRetryDelayMsValue
|
||||
stripeApiRetries = stripeApiRetriesValue
|
||||
stripeRetryDelayMs = stripeRetryDelayMsValue
|
||||
stripeRateLimiters.clear()
|
||||
recurlyApiRetries: recurlyApiRetriesValue,
|
||||
recurlyRetryDelayMs: recurlyRetryDelayMsValue,
|
||||
stripeRateLimit: stripeRateLimitPerSecond,
|
||||
stripeApiRetries: stripeApiRetriesValue,
|
||||
stripeRetryDelayMs: stripeRetryDelayMsValue,
|
||||
logDebug,
|
||||
logWarn,
|
||||
})
|
||||
|
||||
// Set DEBUG_MODE only from CLI arg (--verbose/-v)
|
||||
DEBUG_MODE = !!verbose
|
||||
@@ -1768,11 +1527,11 @@ async function main(trackProgress) {
|
||||
stripeExistingFieldsJsonPath,
|
||||
concurrency,
|
||||
recurlyRateLimit,
|
||||
recurlyApiRetries,
|
||||
recurlyRetryDelayMs,
|
||||
recurlyApiRetries: recurlyApiRetriesValue,
|
||||
recurlyRetryDelayMs: recurlyRetryDelayMsValue,
|
||||
stripeRateLimit: stripeRateLimitPerSecond,
|
||||
stripeApiRetries,
|
||||
stripeRetryDelayMs,
|
||||
stripeApiRetries: stripeApiRetriesValue,
|
||||
stripeRetryDelayMs: stripeRetryDelayMsValue,
|
||||
forceInvalidTax,
|
||||
...(limit != null ? { limit } : {}),
|
||||
})
|
||||
@@ -1981,7 +1740,7 @@ async function main(trackProgress) {
|
||||
// Progress update every 1000 customers (or 100 in debug mode)
|
||||
const progressInterval = DEBUG_MODE ? 100 : 1000
|
||||
if (processedThisRun % progressInterval === 0) {
|
||||
const rateLimiterStats = getRateLimiterStats()
|
||||
const rateLimiterStats = rateLimiters.getRateLimiterStats()
|
||||
const progress = {
|
||||
rowNumber: lastCompletedRowNumber,
|
||||
processedThisRun,
|
||||
@@ -2039,7 +1798,7 @@ async function main(trackProgress) {
|
||||
const totalSuccessful = commit
|
||||
? previouslyProcessed.size + updatedCount
|
||||
: previouslyProcessed.size
|
||||
const finalRateLimiterStats = getRateLimiterStats()
|
||||
const finalRateLimiterStats = rateLimiters.getRateLimiterStats()
|
||||
|
||||
await trackProgress('=== FINAL SUMMARY ===')
|
||||
await trackProgress(`Start time: ${startTime.toISOString()}`)
|
||||
@@ -2054,11 +1813,13 @@ async function main(trackProgress) {
|
||||
await trackProgress(` - limit: ${limit != null ? limit : 'none'}`)
|
||||
await trackProgress(` - concurrency: ${concurrency}`)
|
||||
await trackProgress(` - recurly-rate-limit: ${recurlyRateLimit}`)
|
||||
await trackProgress(` - recurly-api-retries: ${recurlyApiRetries}`)
|
||||
await trackProgress(` - recurly-retry-delay-ms: ${recurlyRetryDelayMs}`)
|
||||
await trackProgress(` - recurly-api-retries: ${recurlyApiRetriesValue}`)
|
||||
await trackProgress(
|
||||
` - recurly-retry-delay-ms: ${recurlyRetryDelayMsValue}`
|
||||
)
|
||||
await trackProgress(` - stripe-rate-limit: ${stripeRateLimitPerSecond}`)
|
||||
await trackProgress(` - stripe-api-retries: ${stripeApiRetries}`)
|
||||
await trackProgress(` - stripe-retry-delay-ms: ${stripeRetryDelayMs}`)
|
||||
await trackProgress(` - stripe-api-retries: ${stripeApiRetriesValue}`)
|
||||
await trackProgress(` - stripe-retry-delay-ms: ${stripeRetryDelayMsValue}`)
|
||||
await trackProgress(` - force-invalid-tax: ${forceInvalidTax}`)
|
||||
await trackProgress(`Input file total rows: ${totalInInput}`)
|
||||
await trackProgress(
|
||||
|
||||
310
services/web/scripts/stripe/RateLimiter.mjs
Normal file
310
services/web/scripts/stripe/RateLimiter.mjs
Normal file
@@ -0,0 +1,310 @@
|
||||
/* eslint-disable @overleaf/require-script-runner */
|
||||
// This file contains helper functions used by other scripts.
|
||||
// The scripts that import these helpers should use Script Runner.
|
||||
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
|
||||
export const DEFAULT_RECURLY_RATE_LIMIT = 10
|
||||
export const DEFAULT_STRIPE_RATE_LIMIT = 50
|
||||
export const DEFAULT_RECURLY_API_RETRIES = 5
|
||||
export const DEFAULT_RECURLY_RETRY_DELAY_MS = 1000
|
||||
export const DEFAULT_STRIPE_API_RETRIES = 5
|
||||
export const DEFAULT_STRIPE_RETRY_DELAY_MS = 1000
|
||||
|
||||
/**
|
||||
* Rate limiter using sliding window algorithm.
|
||||
*
|
||||
* Rate limits (conservative targets, leaving headroom):
|
||||
* - Recurly: 2000 requests per 5 minutes → target 1500/5min = 300/min = 5/sec
|
||||
* https://support.recurly.com/hc/en-us/articles/360034160731-What-Are-Recurly-s-API-Rate-Limits
|
||||
* - Stripe: 100 requests per second → target 50/sec (plenty of headroom)
|
||||
* https://docs.stripe.com/rate-limits
|
||||
*
|
||||
* Recurly is the bottleneck. With 2 Recurly calls per customer (getAccount, getBillingInfo),
|
||||
* we can process ~2.5 customers/second = ~150 customers/minute = ~9000 customers/hour.
|
||||
* For 150K customers, expect ~17 hours at full throughput.
|
||||
*/
|
||||
|
||||
class RateLimiter {
|
||||
/**
|
||||
* @param {string} name - Name for logging
|
||||
* @param {number} maxRequests - Maximum requests allowed in the window
|
||||
* @param {number} windowMs - Window size in milliseconds
|
||||
* @param {Function} logDebug - Optional debug logging function
|
||||
* @param {Function} logWarn - Optional warning logging function
|
||||
*/
|
||||
constructor(
|
||||
name,
|
||||
maxRequests,
|
||||
windowMs,
|
||||
logDebug = () => null,
|
||||
logWarn = () => null
|
||||
) {
|
||||
this.name = name
|
||||
this.maxRequests = maxRequests
|
||||
this.windowMs = windowMs
|
||||
this.requests = [] // timestamps of recent requests
|
||||
this.totalRequests = 0
|
||||
this._pending = Promise.resolve()
|
||||
this.logDebug = logDebug
|
||||
this.logWarn = logWarn
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait if necessary to stay within rate limits, then record the request.
|
||||
*/
|
||||
async throttle() {
|
||||
this._pending = this._pending
|
||||
.catch(error => {
|
||||
// this should never happen since setTimeout or logDebug are very unlikely to ever fail
|
||||
// but if it does, we log it and continue without blocking the queue (fail-open)
|
||||
this.logWarn(`Rate limiter chain error for ${this.name}`, {
|
||||
error: error?.message || String(error),
|
||||
})
|
||||
})
|
||||
.then(async () => {
|
||||
while (true) {
|
||||
const now = Date.now()
|
||||
|
||||
// Remove requests outside the window
|
||||
const windowStart = now - this.windowMs
|
||||
this.requests = this.requests.filter(ts => ts > windowStart)
|
||||
|
||||
// If at limit, wait until the oldest request exits the window
|
||||
if (this.requests.length >= this.maxRequests) {
|
||||
const oldestRequest = this.requests[0]
|
||||
const waitTime = oldestRequest - windowStart + 1
|
||||
if (waitTime > 0) {
|
||||
this.logDebug(
|
||||
`Rate limit throttle for ${this.name}`,
|
||||
{
|
||||
waitMs: waitTime,
|
||||
currentRequests: this.requests.length,
|
||||
maxRequests: this.maxRequests,
|
||||
},
|
||||
{ verboseOnly: true }
|
||||
)
|
||||
await setTimeout(waitTime)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Record this request
|
||||
this.requests.push(Date.now())
|
||||
this.totalRequests++
|
||||
break
|
||||
}
|
||||
})
|
||||
|
||||
return this._pending
|
||||
}
|
||||
|
||||
/**
|
||||
* Get current rate (requests per second over the last window)
|
||||
*/
|
||||
getCurrentRate() {
|
||||
const now = Date.now()
|
||||
const windowStart = now - this.windowMs
|
||||
const recentRequests = this.requests.filter(ts => ts > windowStart).length
|
||||
return (recentRequests / this.windowMs) * 1000 // requests per second
|
||||
}
|
||||
|
||||
getStats() {
|
||||
return {
|
||||
name: this.name,
|
||||
totalRequests: this.totalRequests,
|
||||
currentWindowRequests: this.requests.length,
|
||||
maxRequests: this.maxRequests,
|
||||
currentRate: this.getCurrentRate().toFixed(2) + '/sec',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to extract Stripe rate limit reason from error headers
|
||||
*/
|
||||
function getStripeRateLimitReason(error) {
|
||||
const headers =
|
||||
error?.headers || error?.raw?.headers || error?.response?.headers || {}
|
||||
return (
|
||||
headers['stripe-rate-limit-reason'] ||
|
||||
headers['Stripe-Rate-Limited-Reason'] ||
|
||||
headers['stripe-rate-limited-reason'] ||
|
||||
null
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create rate-limited API wrapper with unified service routing.
|
||||
*
|
||||
* @param {object} config - Configuration options
|
||||
* @param {number} config.recurlyRateLimit - Requests per second for Recurly (default: 10)
|
||||
* @param {number} config.recurlyApiRetries - Number of retries on Recurly 429s (default: 5)
|
||||
* @param {number} config.recurlyRetryDelayMs - Delay between Recurly retries in ms (default: 1000)
|
||||
* @param {number} config.stripeRateLimit - Requests per second for Stripe (default: 50)
|
||||
* @param {number} config.stripeApiRetries - Number of retries on Stripe 429s (default: 5)
|
||||
* @param {number} config.stripeRetryDelayMs - Delay between Stripe retries in ms (default: 1000)
|
||||
* @param {Function} config.logDebug - Optional debug logging function
|
||||
* @param {Function} config.logWarn - Optional warning logging function
|
||||
*
|
||||
* @returns {object} Object with unified call function and stats getter
|
||||
* @returns {Function} returns.call - Unified wrapper for API calls (service, operation, context)
|
||||
* @returns {Function} returns.getRateLimiterStats - Get current rate limiter statistics
|
||||
*/
|
||||
export function createRateLimitedApiWrappers(config = {}) {
|
||||
const {
|
||||
recurlyRateLimit = 10,
|
||||
recurlyApiRetries = 5,
|
||||
recurlyRetryDelayMs = 1000,
|
||||
stripeRateLimit = 50,
|
||||
stripeApiRetries = 5,
|
||||
stripeRetryDelayMs = 1000,
|
||||
logDebug = () => null,
|
||||
logWarn = () => null,
|
||||
} = config
|
||||
|
||||
const RATE_LIMIT_WINDOW_MS = 1000
|
||||
|
||||
// Service configuration registry
|
||||
const serviceConfigs = {
|
||||
recurly: {
|
||||
rateLimit: recurlyRateLimit,
|
||||
apiRetries: recurlyApiRetries,
|
||||
retryDelayMs: recurlyRetryDelayMs,
|
||||
isStripe: false,
|
||||
},
|
||||
stripe: {
|
||||
rateLimit: stripeRateLimit,
|
||||
apiRetries: stripeApiRetries,
|
||||
retryDelayMs: stripeRetryDelayMs,
|
||||
isStripe: true,
|
||||
},
|
||||
}
|
||||
|
||||
// Rate limiter instances per service
|
||||
const rateLimiters = new Map()
|
||||
|
||||
function getRateLimiter(service) {
|
||||
const key = String(service || 'unknown').toLowerCase()
|
||||
if (rateLimiters.has(key)) {
|
||||
return rateLimiters.get(key)
|
||||
}
|
||||
|
||||
// Determine service config
|
||||
let serviceConfig
|
||||
if (key === 'recurly') {
|
||||
serviceConfig = serviceConfigs.recurly
|
||||
} else if (key.startsWith('stripe')) {
|
||||
serviceConfig = serviceConfigs.stripe
|
||||
} else {
|
||||
throw new Error(`Unknown service: ${service}`)
|
||||
}
|
||||
|
||||
const limiter = new RateLimiter(
|
||||
key,
|
||||
serviceConfig.rateLimit,
|
||||
RATE_LIMIT_WINDOW_MS,
|
||||
logDebug,
|
||||
logWarn
|
||||
)
|
||||
rateLimiters.set(key, limiter)
|
||||
return limiter
|
||||
}
|
||||
|
||||
function getServiceConfig(service) {
|
||||
const key = String(service || 'unknown').toLowerCase()
|
||||
if (key === 'recurly') {
|
||||
return serviceConfigs.recurly
|
||||
} else if (key.startsWith('stripe')) {
|
||||
return serviceConfigs.stripe
|
||||
} else {
|
||||
throw new Error(`Unknown service: ${service}`)
|
||||
}
|
||||
}
|
||||
|
||||
async function requestWithRetries(service, operation, { context } = {}) {
|
||||
const serviceConfig = getServiceConfig(service)
|
||||
const rateLimiter = getRateLimiter(service)
|
||||
let attempt = 0
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
await rateLimiter.throttle()
|
||||
return await operation()
|
||||
} catch (error) {
|
||||
const statusCode =
|
||||
error?.statusCode ?? error?.status ?? error?.raw?.statusCode
|
||||
if (statusCode === 429) {
|
||||
attempt++
|
||||
if (attempt > serviceConfig.apiRetries) {
|
||||
logWarn(
|
||||
`${service} rate limit exceeded after ${attempt - 1} retries`,
|
||||
{
|
||||
...context,
|
||||
service,
|
||||
attempt,
|
||||
...(serviceConfig.isStripe
|
||||
? { rateLimitReason: getStripeRateLimitReason(error) }
|
||||
: {}),
|
||||
}
|
||||
)
|
||||
throw error
|
||||
}
|
||||
logDebug(`${service} rate limited, retrying`, {
|
||||
...context,
|
||||
service,
|
||||
attempt,
|
||||
retryDelayMs: serviceConfig.retryDelayMs,
|
||||
...(serviceConfig.isStripe
|
||||
? { rateLimitReason: getStripeRateLimitReason(error) }
|
||||
: {}),
|
||||
})
|
||||
await setTimeout(serviceConfig.retryDelayMs)
|
||||
continue
|
||||
}
|
||||
throw error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get rate limiter statistics for logging
|
||||
*/
|
||||
function getRateLimiterStats() {
|
||||
const allLimiters = [...rateLimiters.values()]
|
||||
|
||||
// Separate Recurly and Stripe limiters
|
||||
const recurlyLimiters = allLimiters.filter(
|
||||
limiter => limiter.name === 'recurly'
|
||||
)
|
||||
const stripeLimiters = allLimiters.filter(limiter =>
|
||||
limiter.name.startsWith('stripe')
|
||||
)
|
||||
|
||||
const stripeTotalRequests = stripeLimiters.reduce(
|
||||
(sum, limiter) => sum + limiter.totalRequests,
|
||||
0
|
||||
)
|
||||
const stripeCurrentRate = stripeLimiters.reduce(
|
||||
(sum, limiter) => sum + limiter.getCurrentRate(),
|
||||
0
|
||||
)
|
||||
|
||||
return {
|
||||
recurly:
|
||||
recurlyLimiters.length > 0
|
||||
? recurlyLimiters[0].getStats()
|
||||
: { totalRequests: 0, currentRate: '0.00/sec' },
|
||||
stripe: {
|
||||
totalRequests: stripeTotalRequests,
|
||||
currentRate: stripeCurrentRate.toFixed(2) + '/sec',
|
||||
},
|
||||
stripeByRegion: stripeLimiters.map(limiter => limiter.getStats()),
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
requestWithRetries,
|
||||
getRateLimiterStats,
|
||||
}
|
||||
}
|
||||
@@ -14,10 +14,16 @@
|
||||
* node scripts/stripe/finalize-stripe-subscription-migration.mjs [OPTS] [INPUT-FILE]
|
||||
*
|
||||
* Options:
|
||||
* --output PATH Output file path (default: /tmp/migrate_output_<timestamp>.csv)
|
||||
* --commit Apply changes (without this, runs in dry-run mode)
|
||||
* --throttle DURATION Minimum time between requests in ms (default: 40)
|
||||
* --help Show help message
|
||||
* --output PATH Output file path (default: /tmp/migrate_output_<timestamp>.csv)
|
||||
* --commit Apply changes (without this, runs in dry-run mode)
|
||||
* --concurrency, -c <n> Number of customers to process concurrently (default: 10)
|
||||
* --recurly-rate-limit N Requests per second for Recurly (default: 10)
|
||||
* --recurly-api-retries N Number of retries on Recurly 429s (default: 5)
|
||||
* --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: 1000)
|
||||
* --stripe-rate-limit N Requests per second for Stripe (default: 50)
|
||||
* --stripe-api-retries N Number of retries on Stripe 429s (default: 5)
|
||||
* --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: 1000)
|
||||
* --help Show help message
|
||||
*
|
||||
* CSV Input Format:
|
||||
* recurly_account_code,target_stripe_account,stripe_customer_id
|
||||
@@ -31,9 +37,9 @@
|
||||
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import * as csv from 'csv'
|
||||
import minimist from 'minimist'
|
||||
import PQueue from 'p-queue'
|
||||
import { z } from '../../app/src/infrastructure/Validation.mjs'
|
||||
import { scriptRunner } from '../lib/ScriptRunner.mjs'
|
||||
import {
|
||||
@@ -49,19 +55,35 @@ import UserAnalyticsIdCache from '../../app/src/Features/Analytics/UserAnalytics
|
||||
import CustomerIoHandler from '../../modules/customer-io/app/src/CustomerIoHandler.mjs'
|
||||
import { ReportError } from './helpers.mjs'
|
||||
import isEqual from 'lodash/isEqual.js'
|
||||
|
||||
const DEFAULT_THROTTLE = 40
|
||||
import {
|
||||
createRateLimitedApiWrappers,
|
||||
DEFAULT_RECURLY_RATE_LIMIT,
|
||||
DEFAULT_STRIPE_RATE_LIMIT,
|
||||
DEFAULT_RECURLY_API_RETRIES,
|
||||
DEFAULT_RECURLY_RETRY_DELAY_MS,
|
||||
DEFAULT_STRIPE_API_RETRIES,
|
||||
DEFAULT_STRIPE_RETRY_DELAY_MS,
|
||||
} from './RateLimiter.mjs'
|
||||
|
||||
const preloadedProductMetadata = new Map()
|
||||
|
||||
// rate limiters - initialized in main()
|
||||
let rateLimiters
|
||||
|
||||
function usage() {
|
||||
console.error(`Usage: node scripts/stripe/finalize-stripe-subscription-migration.mjs [OPTS] [INPUT-FILE]
|
||||
|
||||
Options:
|
||||
--output PATH Output file path (default: /tmp/migrate_output_<timestamp>.csv)
|
||||
--commit Apply changes (without this, runs in dry-run mode)
|
||||
--throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE})
|
||||
--help Show this help message
|
||||
--output PATH Output file path (default: /tmp/migrate_output_<timestamp>.csv)
|
||||
--commit Apply changes (without this, runs in dry-run mode)
|
||||
--concurrency N Number of customers to process concurrently (default: 10)
|
||||
--recurly-rate-limit N Requests per second for Recurly (default: ${DEFAULT_RECURLY_RATE_LIMIT})
|
||||
--recurly-api-retries N Number of retries on Recurly 429s (default: ${DEFAULT_RECURLY_API_RETRIES})
|
||||
--recurly-retry-delay-ms N Delay between Recurly retries in ms (default: ${DEFAULT_RECURLY_RETRY_DELAY_MS})
|
||||
--stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT})
|
||||
--stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES})
|
||||
--stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS})
|
||||
--help Show this help message
|
||||
`)
|
||||
}
|
||||
|
||||
@@ -70,9 +92,22 @@ async function main(trackProgress) {
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||
const outputFile = opts.output ?? `/tmp/migrate_output_${timestamp}.csv`
|
||||
|
||||
// initialize rate limiters
|
||||
rateLimiters = createRateLimitedApiWrappers({
|
||||
recurlyRateLimit: opts.recurlyRateLimit,
|
||||
recurlyApiRetries: opts.recurlyApiRetries,
|
||||
recurlyRetryDelayMs: opts.recurlyRetryDelayMs,
|
||||
stripeRateLimit: opts.stripeRateLimit,
|
||||
stripeApiRetries: opts.stripeApiRetries,
|
||||
stripeRetryDelayMs: opts.stripeRetryDelayMs,
|
||||
})
|
||||
|
||||
await trackProgress('Starting Recurly to Stripe migration cutover')
|
||||
await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`)
|
||||
await trackProgress(`Throttle: ${opts.throttle}ms between requests`)
|
||||
await trackProgress(
|
||||
`Rate limits: Recurly ${opts.recurlyRateLimit}/s, Stripe ${opts.stripeRateLimit}/s`
|
||||
)
|
||||
await trackProgress(`Concurrency: ${opts.concurrency}`)
|
||||
|
||||
const inputStream = opts.inputFile
|
||||
? fs.createReadStream(opts.inputFile)
|
||||
@@ -91,74 +126,82 @@ async function main(trackProgress) {
|
||||
let successCount = 0
|
||||
let errorCount = 0
|
||||
|
||||
let lastLoopTimestamp = 0
|
||||
for await (const input of csvReader) {
|
||||
const timeSinceLastLoop = Date.now() - lastLoopTimestamp
|
||||
if (timeSinceLastLoop < opts.throttle) {
|
||||
await setTimeout(opts.throttle - timeSinceLastLoop)
|
||||
}
|
||||
lastLoopTimestamp = Date.now()
|
||||
const queue = new PQueue({ concurrency: opts.concurrency })
|
||||
const maxQueueSize = opts.concurrency
|
||||
|
||||
processedCount++
|
||||
try {
|
||||
for await (const input of csvReader) {
|
||||
// throttle input if queue is full
|
||||
if (queue.size >= maxQueueSize) {
|
||||
await queue.onSizeLessThan(maxQueueSize)
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await processMigration(input, opts.commit)
|
||||
queue.add(async () => {
|
||||
processedCount++
|
||||
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: result.previousRecurlyStatus || '',
|
||||
previous_recurly_subscription_id:
|
||||
result.previousRecurlySubscriptionId || '',
|
||||
email: result.email || '',
|
||||
analyticsId: result.analyticsId || '',
|
||||
status: result.status,
|
||||
note: result.note,
|
||||
try {
|
||||
const result = await processMigration(input, opts.commit)
|
||||
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: result.previousRecurlyStatus || '',
|
||||
previous_recurly_subscription_id:
|
||||
result.previousRecurlySubscriptionId || '',
|
||||
email: result.email || '',
|
||||
analyticsId: result.analyticsId || '',
|
||||
status: result.status,
|
||||
note: result.note,
|
||||
})
|
||||
|
||||
if (
|
||||
result.status.startsWith('migrated') ||
|
||||
result.status === 'validated'
|
||||
) {
|
||||
successCount++
|
||||
} else {
|
||||
errorCount++
|
||||
}
|
||||
|
||||
if (processedCount % 25 === 0) {
|
||||
await trackProgress(
|
||||
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
errorCount++
|
||||
if (err instanceof ReportError) {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: '',
|
||||
previous_recurly_subscription_id: '',
|
||||
email: '',
|
||||
analyticsId: '',
|
||||
status: err.status,
|
||||
note: err.message,
|
||||
})
|
||||
} else {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: '',
|
||||
previous_recurly_subscription_id: '',
|
||||
email: '',
|
||||
analyticsId: '',
|
||||
status: 'error',
|
||||
note: err.message,
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if (
|
||||
result.status.startsWith('migrated') ||
|
||||
result.status === 'validated'
|
||||
) {
|
||||
successCount++
|
||||
} else {
|
||||
errorCount++
|
||||
}
|
||||
|
||||
if (processedCount % 25 === 0) {
|
||||
await trackProgress(
|
||||
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
errorCount++
|
||||
if (err instanceof ReportError) {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: '',
|
||||
previous_recurly_subscription_id: '',
|
||||
email: '',
|
||||
analyticsId: '',
|
||||
status: err.status,
|
||||
note: err.message,
|
||||
})
|
||||
} else {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
previous_recurly_status: '',
|
||||
previous_recurly_subscription_id: '',
|
||||
email: '',
|
||||
analyticsId: '',
|
||||
status: 'error',
|
||||
note: err.message,
|
||||
})
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// wait for all queued tasks to complete
|
||||
await queue.onIdle()
|
||||
}
|
||||
|
||||
await trackProgress(`✅ Total processed: ${processedCount}`)
|
||||
@@ -213,10 +256,15 @@ async function preloadProductMetadata(region) {
|
||||
if (preloadedProductMetadata.has(region)) return
|
||||
|
||||
const stripeClient = getRegionClient(region)
|
||||
const products = await stripeClient.stripe.products.list({
|
||||
active: true,
|
||||
limit: 100,
|
||||
})
|
||||
const products = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() =>
|
||||
stripeClient.stripe.products.list({
|
||||
active: true,
|
||||
limit: 100,
|
||||
}),
|
||||
{ operation: 'products.list', region: stripeClient.serviceName }
|
||||
)
|
||||
|
||||
const results = new Map()
|
||||
for (const product of products.data) {
|
||||
@@ -264,9 +312,15 @@ async function processMigration(input, commit) {
|
||||
let stripeCustomer
|
||||
let stripeSubscription
|
||||
try {
|
||||
stripeCustomer = await stripeClient.getCustomerById(stripeCustomerId, [
|
||||
'subscriptions',
|
||||
])
|
||||
stripeCustomer = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.getCustomerById(stripeCustomerId, ['subscriptions']),
|
||||
{
|
||||
operation: 'getCustomerById',
|
||||
stripeCustomerId,
|
||||
region: stripeClient.serviceName,
|
||||
}
|
||||
)
|
||||
if (
|
||||
!stripeCustomer.subscriptions ||
|
||||
stripeCustomer.subscriptions.data.length === 0
|
||||
@@ -297,9 +351,17 @@ async function processMigration(input, commit) {
|
||||
// 5. Fetch Recurly subscription
|
||||
let recurlySubscription
|
||||
try {
|
||||
recurlySubscription = await RecurlyWrapper.promises.getSubscription(
|
||||
previousRecurlySubscriptionId,
|
||||
{}
|
||||
recurlySubscription = await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() =>
|
||||
RecurlyWrapper.promises.getSubscription(
|
||||
previousRecurlySubscriptionId,
|
||||
{}
|
||||
),
|
||||
{
|
||||
operation: 'getSubscription',
|
||||
recurlySubscriptionId: previousRecurlySubscriptionId,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
@@ -493,11 +555,19 @@ async function performCutover(
|
||||
postponedDate.setFullYear(currentBillingDate.getFullYear() + 10)
|
||||
|
||||
try {
|
||||
await RecurlyWrapper.promises.apiRequest({
|
||||
url: `subscriptions/${recurlySubscription.uuid}/postpone`,
|
||||
qs: { bulk: true, next_bill_date: postponedDate },
|
||||
method: 'PUT',
|
||||
})
|
||||
await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() =>
|
||||
RecurlyWrapper.promises.apiRequest({
|
||||
url: `subscriptions/${recurlySubscription.uuid}/postpone`,
|
||||
qs: { bulk: true, next_bill_date: postponedDate },
|
||||
method: 'PUT',
|
||||
}),
|
||||
{
|
||||
operation: 'postpone',
|
||||
recurlySubscriptionId: recurlySubscription.uuid,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
'migrated-recurly-postpone-failed',
|
||||
@@ -508,9 +578,18 @@ async function performCutover(
|
||||
|
||||
// Step 4: Remove migration metadata from Stripe
|
||||
try {
|
||||
await stripeClient.updateSubscriptionMetadata(stripeSubscription.id, {
|
||||
recurly_to_stripe_migration_status: '',
|
||||
})
|
||||
await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() =>
|
||||
stripeClient.updateSubscriptionMetadata(stripeSubscription.id, {
|
||||
recurly_to_stripe_migration_status: '',
|
||||
}),
|
||||
{
|
||||
operation: 'updateSubscriptionMetadata',
|
||||
stripeSubscriptionId: stripeSubscription.id,
|
||||
region: stripeClient.serviceName,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
'migrated-metadata-removal-failed',
|
||||
@@ -534,26 +613,7 @@ async function performCutover(
|
||||
)
|
||||
}
|
||||
|
||||
// Step 6. Remap customer metadata (if needed) in Stripe
|
||||
if (
|
||||
stripeCustomer.metadata != null &&
|
||||
stripeCustomer.metadata.recurlyAccountCode != null &&
|
||||
stripeCustomer.metadata.userId == null
|
||||
) {
|
||||
try {
|
||||
await stripeClient.updateCustomerMetadata(stripeCustomer.id, {
|
||||
recurlyAccountCode: '',
|
||||
userId: adminUserId,
|
||||
})
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
'migrated-customer-metadata-removal-failed',
|
||||
`Successfully migrated to Stripe and registered analytics mapping but failed to remove customer metadata: ${err.message}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Step 7. Send data to customer.io
|
||||
// Step 6. Send data to customer.io
|
||||
if (analyticsId) {
|
||||
try {
|
||||
const migrationDate = new Date().toISOString().slice(0, 10)
|
||||
@@ -580,10 +640,27 @@ async function performCutover(
|
||||
|
||||
function parseArgs() {
|
||||
const args = minimist(process.argv.slice(2), {
|
||||
string: ['output'],
|
||||
number: ['throttle'],
|
||||
string: [
|
||||
'output',
|
||||
'concurrency',
|
||||
'recurly-rate-limit',
|
||||
'recurly-api-retries',
|
||||
'recurly-retry-delay-ms',
|
||||
'stripe-rate-limit',
|
||||
'stripe-api-retries',
|
||||
'stripe-retry-delay-ms',
|
||||
],
|
||||
boolean: ['commit', 'help'],
|
||||
default: { commit: false, throttle: DEFAULT_THROTTLE },
|
||||
default: {
|
||||
commit: false,
|
||||
concurrency: 10,
|
||||
'recurly-rate-limit': DEFAULT_RECURLY_RATE_LIMIT,
|
||||
'recurly-api-retries': DEFAULT_RECURLY_API_RETRIES,
|
||||
'recurly-retry-delay-ms': DEFAULT_RECURLY_RETRY_DELAY_MS,
|
||||
'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT,
|
||||
'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES,
|
||||
'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS,
|
||||
},
|
||||
})
|
||||
|
||||
if (args.help) {
|
||||
@@ -595,7 +672,13 @@ function parseArgs() {
|
||||
const paramsSchema = z.object({
|
||||
output: z.string().optional(),
|
||||
commit: z.boolean(),
|
||||
throttle: z.number().int().positive(),
|
||||
concurrency: z.number().int().positive(),
|
||||
recurlyRateLimit: z.number().positive(),
|
||||
recurlyApiRetries: z.number().int().nonnegative(),
|
||||
recurlyRetryDelayMs: z.number().int().nonnegative(),
|
||||
stripeRateLimit: z.number().positive(),
|
||||
stripeApiRetries: z.number().int().nonnegative(),
|
||||
stripeRetryDelayMs: z.number().int().nonnegative(),
|
||||
inputFile: z.string().optional(),
|
||||
})
|
||||
|
||||
@@ -603,7 +686,13 @@ function parseArgs() {
|
||||
return paramsSchema.parse({
|
||||
output: args.output,
|
||||
commit: args.commit,
|
||||
throttle: args.throttle,
|
||||
concurrency: Number(args.concurrency),
|
||||
recurlyRateLimit: Number(args['recurly-rate-limit']),
|
||||
recurlyApiRetries: Number(args['recurly-api-retries']),
|
||||
recurlyRetryDelayMs: Number(args['recurly-retry-delay-ms']),
|
||||
stripeRateLimit: Number(args['stripe-rate-limit']),
|
||||
stripeApiRetries: Number(args['stripe-api-retries']),
|
||||
stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']),
|
||||
inputFile,
|
||||
})
|
||||
} catch (err) {
|
||||
|
||||
@@ -29,9 +29,9 @@
|
||||
|
||||
import fs from 'node:fs'
|
||||
import path from 'node:path'
|
||||
import { setTimeout } from 'node:timers/promises'
|
||||
import * as csv from 'csv'
|
||||
import minimist from 'minimist'
|
||||
import PQueue from 'p-queue'
|
||||
import { z } from '../../app/src/infrastructure/Validation.mjs'
|
||||
import { scriptRunner } from '../lib/ScriptRunner.mjs'
|
||||
import { getRegionClient } from '../../modules/subscriptions/app/src/StripeClient.mjs'
|
||||
@@ -42,17 +42,33 @@ import UserAnalyticsIdCache from '../../app/src/Features/Analytics/UserAnalytics
|
||||
import CustomerIoHandler from '../../modules/customer-io/app/src/CustomerIoHandler.mjs'
|
||||
import { ReportError } from './helpers.mjs'
|
||||
import AccountMappingHelper from '../../app/src/Features/Analytics/AccountMappingHelper.mjs'
|
||||
import {
|
||||
createRateLimitedApiWrappers,
|
||||
DEFAULT_RECURLY_RATE_LIMIT,
|
||||
DEFAULT_STRIPE_RATE_LIMIT,
|
||||
DEFAULT_RECURLY_API_RETRIES,
|
||||
DEFAULT_RECURLY_RETRY_DELAY_MS,
|
||||
DEFAULT_STRIPE_API_RETRIES,
|
||||
DEFAULT_STRIPE_RETRY_DELAY_MS,
|
||||
} from './RateLimiter.mjs'
|
||||
|
||||
const DEFAULT_THROTTLE = 40
|
||||
// rate limiters - initialized in main()
|
||||
let rateLimiters
|
||||
|
||||
function usage() {
|
||||
console.error(`Usage: node scripts/stripe/rollback-finalized-stripe-migration.mjs [OPTS] [INPUT-FILE]
|
||||
|
||||
Options:
|
||||
--output PATH Output file path (default: /tmp/rollback_output_<timestamp>.csv)
|
||||
--commit Apply changes (without this, runs in dry-run mode)
|
||||
--throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE})
|
||||
--help Show this help message
|
||||
--output PATH Output file path (default: /tmp/rollback_output_<timestamp>.csv)
|
||||
--commit Apply changes (without this, runs in dry-run mode)
|
||||
--concurrency N Number of rollbacks to process concurrently (default: 10)
|
||||
--recurly-rate-limit N Requests per second for Recurly (default: ${DEFAULT_RECURLY_RATE_LIMIT})
|
||||
--recurly-api-retries N Number of retries on Recurly 429s (default: ${DEFAULT_RECURLY_API_RETRIES})
|
||||
--recurly-retry-delay-ms N Delay between Recurly retries in ms (default: ${DEFAULT_RECURLY_RETRY_DELAY_MS})
|
||||
--stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT})
|
||||
--stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES})
|
||||
--stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS})
|
||||
--help Show this help message
|
||||
|
||||
Note: This script does NOT cancel Stripe subscriptions. Use scripts/stripe/bulk-cancel-subscriptions.mjs separately.
|
||||
`)
|
||||
@@ -63,12 +79,25 @@ async function main(trackProgress) {
|
||||
const timestamp = new Date().toISOString().replace(/[:.]/g, '-')
|
||||
const outputFile = opts.output ?? `/tmp/rollback_output_${timestamp}.csv`
|
||||
|
||||
// initialize rate limiters
|
||||
rateLimiters = createRateLimitedApiWrappers({
|
||||
recurlyRateLimit: opts.recurlyRateLimit,
|
||||
recurlyApiRetries: opts.recurlyApiRetries,
|
||||
recurlyRetryDelayMs: opts.recurlyRetryDelayMs,
|
||||
stripeRateLimit: opts.stripeRateLimit,
|
||||
stripeApiRetries: opts.stripeApiRetries,
|
||||
stripeRetryDelayMs: opts.stripeRetryDelayMs,
|
||||
})
|
||||
|
||||
await trackProgress('Starting Stripe to Recurly rollback')
|
||||
await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`)
|
||||
await trackProgress(
|
||||
'Note: Stripe subscriptions are NOT cancelled by this script'
|
||||
)
|
||||
await trackProgress(`Throttle: ${opts.throttle}ms between requests`)
|
||||
await trackProgress(
|
||||
`Rate limits: Recurly ${opts.recurlyRateLimit}/s, Stripe ${opts.stripeRateLimit}/s`
|
||||
)
|
||||
await trackProgress(`Concurrency: ${opts.concurrency}`)
|
||||
|
||||
const inputStream = opts.inputFile
|
||||
? fs.createReadStream(opts.inputFile)
|
||||
@@ -82,62 +111,70 @@ async function main(trackProgress) {
|
||||
let successCount = 0
|
||||
let errorCount = 0
|
||||
|
||||
let lastLoopTimestamp = 0
|
||||
for await (const input of csvReader) {
|
||||
const timeSinceLastLoop = Date.now() - lastLoopTimestamp
|
||||
if (timeSinceLastLoop < opts.throttle) {
|
||||
await setTimeout(opts.throttle - timeSinceLastLoop)
|
||||
}
|
||||
lastLoopTimestamp = Date.now()
|
||||
const queue = new PQueue({ concurrency: opts.concurrency })
|
||||
const maxQueueSize = opts.concurrency
|
||||
|
||||
processedCount++
|
||||
try {
|
||||
for await (const input of csvReader) {
|
||||
// throttle input if queue is full
|
||||
if (queue.size >= maxQueueSize) {
|
||||
await queue.onSizeLessThan(maxQueueSize)
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await processRollback(input, opts.commit)
|
||||
queue.add(async () => {
|
||||
processedCount++
|
||||
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: result.status,
|
||||
note: result.note,
|
||||
try {
|
||||
const result = await processRollback(input, opts.commit)
|
||||
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: result.status,
|
||||
note: result.note,
|
||||
})
|
||||
|
||||
if (
|
||||
result.status === 'rolled-back' ||
|
||||
result.status === 'validated' ||
|
||||
result.status === 'already-recurly'
|
||||
) {
|
||||
successCount++
|
||||
} else {
|
||||
errorCount++
|
||||
}
|
||||
|
||||
if (processedCount % 25 === 0) {
|
||||
await trackProgress(
|
||||
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
errorCount++
|
||||
if (err instanceof ReportError) {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: err.status,
|
||||
note: err.message,
|
||||
})
|
||||
} else {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: 'error',
|
||||
note: err.message,
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
if (
|
||||
result.status === 'rolled-back' ||
|
||||
result.status === 'validated' ||
|
||||
result.status === 'already-recurly'
|
||||
) {
|
||||
successCount++
|
||||
} else {
|
||||
errorCount++
|
||||
}
|
||||
|
||||
if (processedCount % 25 === 0) {
|
||||
await trackProgress(
|
||||
`Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors`
|
||||
)
|
||||
}
|
||||
} catch (err) {
|
||||
errorCount++
|
||||
if (err instanceof ReportError) {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: err.status,
|
||||
note: err.message,
|
||||
})
|
||||
} else {
|
||||
csvWriter.write({
|
||||
recurly_account_code: input.recurly_account_code,
|
||||
target_stripe_account: input.target_stripe_account,
|
||||
stripe_customer_id: input.stripe_customer_id,
|
||||
status: 'error',
|
||||
note: err.message,
|
||||
})
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
// wait for all queued tasks to complete
|
||||
await queue.onIdle()
|
||||
}
|
||||
|
||||
await trackProgress(`✅ Total processed: ${processedCount}`)
|
||||
@@ -229,8 +266,15 @@ async function processRollback(input, commit) {
|
||||
// 4. Find Recurly subscription ID from Stripe metadata
|
||||
let recurlySubscriptionId
|
||||
try {
|
||||
const stripeSubData =
|
||||
await stripeClient.stripe.subscriptions.retrieve(stripeSubscriptionId)
|
||||
const stripeSubData = await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() => stripeClient.stripe.subscriptions.retrieve(stripeSubscriptionId),
|
||||
{
|
||||
operation: 'subscriptions.retrieve',
|
||||
stripeSubscriptionId,
|
||||
region: stripeClient.serviceName,
|
||||
}
|
||||
)
|
||||
recurlySubscriptionId = stripeSubData.metadata?.recurly_subscription_id
|
||||
if (!recurlySubscriptionId) {
|
||||
throw new ReportError(
|
||||
@@ -249,9 +293,13 @@ async function processRollback(input, commit) {
|
||||
// 5. Fetch Recurly subscription to get original billing date
|
||||
let recurlySubscription
|
||||
try {
|
||||
recurlySubscription = await RecurlyWrapper.promises.getSubscription(
|
||||
recurlySubscriptionId,
|
||||
{}
|
||||
recurlySubscription = await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() => RecurlyWrapper.promises.getSubscription(recurlySubscriptionId, {}),
|
||||
{
|
||||
operation: 'getSubscription',
|
||||
recurlySubscriptionId,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
@@ -316,11 +364,19 @@ async function performRollback(
|
||||
|
||||
if (targetBillingDateIsInFuture) {
|
||||
try {
|
||||
await RecurlyWrapper.promises.apiRequest({
|
||||
url: `subscriptions/${recurlySubscriptionId}/postpone`,
|
||||
qs: { bulk: true, next_bill_date: nextBillingDate },
|
||||
method: 'PUT',
|
||||
})
|
||||
await rateLimiters.requestWithRetries(
|
||||
'recurly',
|
||||
() =>
|
||||
RecurlyWrapper.promises.apiRequest({
|
||||
url: `subscriptions/${recurlySubscriptionId}/postpone`,
|
||||
qs: { bulk: true, next_bill_date: nextBillingDate },
|
||||
method: 'PUT',
|
||||
}),
|
||||
{
|
||||
operation: 'postpone',
|
||||
recurlySubscriptionId,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
'rolled-back-recurly-restore-failed',
|
||||
@@ -337,9 +393,18 @@ async function performRollback(
|
||||
|
||||
// Step 4: Restore migration metadata to Stripe
|
||||
try {
|
||||
await stripeClient.updateSubscriptionMetadata(stripeSubscriptionId, {
|
||||
recurly_to_stripe_migration_status: 'in_progress',
|
||||
})
|
||||
await rateLimiters.requestWithRetries(
|
||||
stripeClient.serviceName,
|
||||
() =>
|
||||
stripeClient.updateSubscriptionMetadata(stripeSubscriptionId, {
|
||||
recurly_to_stripe_migration_status: 'in_progress',
|
||||
}),
|
||||
{
|
||||
operation: 'updateSubscriptionMetadata',
|
||||
stripeSubscriptionId,
|
||||
region: stripeClient.serviceName,
|
||||
}
|
||||
)
|
||||
} catch (err) {
|
||||
throw new ReportError(
|
||||
'rolled-back-metadata-restore-failed',
|
||||
@@ -381,12 +446,26 @@ async function performRollback(
|
||||
|
||||
function parseArgs() {
|
||||
const args = minimist(process.argv.slice(2), {
|
||||
string: ['output'],
|
||||
number: ['throttle'],
|
||||
string: [
|
||||
'output',
|
||||
'concurrency',
|
||||
'recurly-rate-limit',
|
||||
'recurly-api-retries',
|
||||
'recurly-retry-delay-ms',
|
||||
'stripe-rate-limit',
|
||||
'stripe-api-retries',
|
||||
'stripe-retry-delay-ms',
|
||||
],
|
||||
boolean: ['commit', 'help'],
|
||||
default: {
|
||||
commit: false,
|
||||
throttle: DEFAULT_THROTTLE,
|
||||
concurrency: 10,
|
||||
'recurly-rate-limit': DEFAULT_RECURLY_RATE_LIMIT,
|
||||
'recurly-api-retries': DEFAULT_RECURLY_API_RETRIES,
|
||||
'recurly-retry-delay-ms': DEFAULT_RECURLY_RETRY_DELAY_MS,
|
||||
'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT,
|
||||
'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES,
|
||||
'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS,
|
||||
},
|
||||
})
|
||||
|
||||
@@ -399,7 +478,13 @@ function parseArgs() {
|
||||
const paramsSchema = z.object({
|
||||
output: z.string().optional(),
|
||||
commit: z.boolean(),
|
||||
throttle: z.number().int().positive(),
|
||||
concurrency: z.number().int().positive(),
|
||||
recurlyRateLimit: z.number().positive(),
|
||||
recurlyApiRetries: z.number().int().nonnegative(),
|
||||
recurlyRetryDelayMs: z.number().int().nonnegative(),
|
||||
stripeRateLimit: z.number().positive(),
|
||||
stripeApiRetries: z.number().int().nonnegative(),
|
||||
stripeRetryDelayMs: z.number().int().nonnegative(),
|
||||
inputFile: z.string().optional(),
|
||||
})
|
||||
|
||||
@@ -407,7 +492,13 @@ function parseArgs() {
|
||||
return paramsSchema.parse({
|
||||
output: args.output,
|
||||
commit: args.commit,
|
||||
throttle: args.throttle,
|
||||
concurrency: Number(args.concurrency),
|
||||
recurlyRateLimit: Number(args['recurly-rate-limit']),
|
||||
recurlyApiRetries: Number(args['recurly-api-retries']),
|
||||
recurlyRetryDelayMs: Number(args['recurly-retry-delay-ms']),
|
||||
stripeRateLimit: Number(args['stripe-rate-limit']),
|
||||
stripeApiRetries: Number(args['stripe-api-retries']),
|
||||
stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']),
|
||||
inputFile,
|
||||
})
|
||||
} catch (err) {
|
||||
|
||||
Reference in New Issue
Block a user