diff --git a/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs b/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs index c1ce2d7ff0..988391d583 100644 --- a/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs +++ b/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs @@ -91,7 +91,6 @@ import minimist from 'minimist' import PQueue from 'p-queue' import fs from 'node:fs' import * as csv from 'csv' -import { setTimeout } from 'node:timers/promises' import { scriptRunner } from '../lib/ScriptRunner.mjs' import { @@ -101,6 +100,15 @@ import { coalesceOrThrowVATNumber, getTaxIdType, } from '../helpers/migrate_recurly_customers_to_stripe.helpers.mjs' +import { + createRateLimitedApiWrappers, + DEFAULT_RECURLY_RATE_LIMIT, + DEFAULT_STRIPE_RATE_LIMIT, + DEFAULT_RECURLY_API_RETRIES, + DEFAULT_RECURLY_RETRY_DELAY_MS, + DEFAULT_STRIPE_API_RETRIES, + DEFAULT_STRIPE_RETRY_DELAY_MS, +} from '../stripe/RateLimiter.mjs' // ============================================================================= // STRIPE CLIENT SETUP @@ -148,12 +156,16 @@ function getRegionClient(region) { ) } - stripeClients[regionLower] = new Stripe(secretKey, { + const client = new Stripe(secretKey, { httpClient: Stripe.createFetchHttpClient(), telemetry: false, }) - return stripeClients[regionLower] + // Add serviceName for rate limiter identification (stripe-us or stripe-uk) + client.serviceName = `stripe-${regionLower}` + + stripeClients[regionLower] = client + return client } // ============================================================================= @@ -527,259 +539,8 @@ function createJsonArrayWriter(jsonPath) { // RATE LIMITING // ============================================================================= -/** - * Rate limiter using sliding window algorithm. - * - * Rate limits (conservative targets, leaving headroom): - * - Recurly: 2000 requests per 5 minutes → target 1500/5min = 300/min = 5/sec - * https://support.recurly.com/hc/en-us/articles/360034160731-What-Are-Recurly-s-API-Rate-Limits - * - Stripe: 100 requests per second → target 50/sec (plenty of headroom) - * https://docs.stripe.com/rate-limits - * - * Recurly is the bottleneck. With 2 Recurly calls per customer (getAccount, getBillingInfo), - * we can process ~2.5 customers/second = ~150 customers/minute = ~9000 customers/hour. - * For 150K customers, expect ~17 hours at full throughput. - */ - -class RateLimiter { - /** - * @param {string} name - Name for logging - * @param {number} maxRequests - Maximum requests allowed in the window - * @param {number} windowMs - Window size in milliseconds - */ - constructor(name, maxRequests, windowMs) { - this.name = name - this.maxRequests = maxRequests - this.windowMs = windowMs - this.requests = [] // timestamps of recent requests - this.totalRequests = 0 - this._pending = Promise.resolve() - } - - /** - * Wait if necessary to stay within rate limits, then record the request. - */ - async throttle() { - this._pending = this._pending - .catch(error => { - // this should never happen since setTimeout or logDebug are very unlikely to ever fail - // but if it does, we log it and continue without blocking the queue (fail-open) - logWarn(`Rate limiter chain error for ${this.name}`, { - error: error?.message || String(error), - }) - }) - .then(async () => { - while (true) { - const now = Date.now() - - // Remove requests outside the window - const windowStart = now - this.windowMs - this.requests = this.requests.filter(ts => ts > windowStart) - - // If at limit, wait until the oldest request exits the window - if (this.requests.length >= this.maxRequests) { - const oldestRequest = this.requests[0] - const waitTime = oldestRequest - windowStart + 1 - if (waitTime > 0) { - logDebug( - `Rate limit throttle for ${this.name}`, - { - waitMs: waitTime, - currentRequests: this.requests.length, - maxRequests: this.maxRequests, - }, - { verboseOnly: true } - ) - await setTimeout(waitTime) - continue - } - } - - // Record this request - this.requests.push(Date.now()) - this.totalRequests++ - break - } - }) - - return this._pending - } - - /** - * Get current rate (requests per second over the last window) - */ - getCurrentRate() { - const now = Date.now() - const windowStart = now - this.windowMs - const recentRequests = this.requests.filter(ts => ts > windowStart).length - return (recentRequests / this.windowMs) * 1000 // requests per second - } - - getStats() { - return { - name: this.name, - totalRequests: this.totalRequests, - currentWindowRequests: this.requests.length, - maxRequests: this.maxRequests, - currentRate: this.getCurrentRate().toFixed(2) + '/sec', - } - } -} - -const DEFAULT_RECURLY_RATE_LIMIT = 10 // requests per second -const DEFAULT_STRIPE_RATE_LIMIT = 50 // requests per second -const DEFAULT_RECURLY_API_RETRIES = 5 -const DEFAULT_RECURLY_RETRY_DELAY_MS = 1000 -const DEFAULT_STRIPE_API_RETRIES = 5 -const DEFAULT_STRIPE_RETRY_DELAY_MS = 1000 -const RATE_LIMIT_WINDOW_MS = 1000 - -let recurlyRateLimiter -let recurlyApiRetries = DEFAULT_RECURLY_API_RETRIES -let recurlyRetryDelayMs = DEFAULT_RECURLY_RETRY_DELAY_MS -let stripeRateLimitPerSecond = DEFAULT_STRIPE_RATE_LIMIT -let stripeApiRetries = DEFAULT_STRIPE_API_RETRIES -let stripeRetryDelayMs = DEFAULT_STRIPE_RETRY_DELAY_MS -const stripeRateLimiters = new Map() - -function getStripeRateLimiter(region) { - const key = String(region || 'unknown').toLowerCase() - if (stripeRateLimiters.has(key)) return stripeRateLimiters.get(key) - - const limiter = new RateLimiter( - `Stripe-${key}`, - stripeRateLimitPerSecond, - RATE_LIMIT_WINDOW_MS - ) - stripeRateLimiters.set(key, limiter) - return limiter -} - -/** - * Throttle before making a Recurly API call - */ -async function throttleRecurly() { - await recurlyRateLimiter.throttle() -} - -async function recurlyRequestWithRetries(operation, { context } = {}) { - let attempt = 0 - while (true) { - try { - return await operation() - } catch (error) { - const statusCode = - error?.statusCode ?? error?.status ?? error?.raw?.statusCode - if (statusCode === 429) { - logWarn('Recurly rate limited', { - rowNumber: context?.rowNumber, - recurlyAccountCode: context?.recurlyAccountCode, - attempt: attempt + 1, - maxRetries: recurlyApiRetries, - }) - - if (attempt < recurlyApiRetries) { - attempt++ - await setTimeout(recurlyRetryDelayMs) - continue - } - } - throw error - } - } -} - -async function recurlyCall(operation, context) { - return recurlyRequestWithRetries( - async () => { - await throttleRecurly() - return operation() - }, - { context } - ) -} - -/** - * Throttle before making a Stripe API call - */ -async function throttleStripe(region) { - await getStripeRateLimiter(region).throttle() -} - -/** - * Get rate limiter statistics for logging - */ -function getRateLimiterStats() { - const stripeLimiters = [...stripeRateLimiters.values()] - const stripeTotalRequests = stripeLimiters.reduce( - (sum, limiter) => sum + limiter.totalRequests, - 0 - ) - const stripeCurrentRate = stripeLimiters.reduce( - (sum, limiter) => sum + limiter.getCurrentRate(), - 0 - ) - - return { - recurly: recurlyRateLimiter.getStats(), - stripe: { - totalRequests: stripeTotalRequests, - currentRate: stripeCurrentRate.toFixed(2) + '/sec', - }, - stripeByRegion: stripeLimiters.map(limiter => limiter.getStats()), - } -} - -function getStripeRateLimitReason(error) { - const headers = - error?.headers || error?.raw?.headers || error?.response?.headers || {} - return ( - headers['stripe-rate-limit-reason'] || - headers['Stripe-Rate-Limited-Reason'] || - headers['stripe-rate-limited-reason'] || - null - ) -} - -async function stripeRequestWithRetries(operation, { context } = {}) { - let attempt = 0 - while (true) { - try { - return await operation() - } catch (error) { - const statusCode = error?.statusCode ?? error?.raw?.statusCode - if (statusCode === 429) { - const reason = getStripeRateLimitReason(error) - logWarn('Stripe rate limited', { - rowNumber: context?.rowNumber, - stripeCustomerId: context?.stripeCustomerId, - stripeAccount: context?.stripeAccount, - reason, - stripeApi: context?.stripeApi, - attempt: attempt + 1, - maxRetries: stripeApiRetries, - }) - - if (attempt < stripeApiRetries) { - attempt++ - await setTimeout(stripeRetryDelayMs) - continue - } - } - throw error - } - } -} - -async function stripeCall(region, operation, context) { - return stripeRequestWithRetries( - async () => { - await throttleStripe(region) - return operation() - }, - { context } - ) -} +// rate limiters - initialized in main() +let rateLimiters // ============================================================================= // DATA TRANSFORMATION @@ -792,14 +553,16 @@ async function stripeCall(region, operation, context) { * @returns {Promise<{account: object, billingInfo: object|null}>} */ async function fetchRecurlyData(accountCode, context) { - const account = await recurlyCall( + const account = await rateLimiters.requestWithRetries( + 'recurly', () => recurlyClient.getAccount(`code-${accountCode}`), context ) let billingInfo = null try { - billingInfo = await recurlyCall( + billingInfo = await rateLimiters.requestWithRetries( + 'recurly', () => recurlyClient.getBillingInfo(`code-${accountCode}`), context ) @@ -826,12 +589,10 @@ async function fetchRecurlyData(accountCode, context) { async function fetchTargetStripeCustomer( stripeClient, stripeCustomerId, - region, context ) { - // TODO: consider getting the region from stripeClient.serviceName - const customer = await stripeCall( - region, + const customer = await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.retrieve(stripeCustomerId), { ...context, stripeApi: 'customers.retrieve' } ) @@ -853,11 +614,10 @@ async function fetchTargetStripeCustomer( async function fetchTargetStripeCustomerPaymentMethods( stripeClient, stripeCustomerId, - region, context ) { - const paymentMethods = await stripeCall( - region, + const paymentMethods = await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.listPaymentMethods(stripeCustomerId), { ...context, stripeApi: 'customers.listPaymentMethods' } ) @@ -874,8 +634,7 @@ async function replaceCustomerTaxIds( stripeClient, stripeCustomerId, { taxIdType, vatNumber }, - context, - region + context ) { // Stripe customers can have multiple tax IDs. For this migration, we want a single // authoritative tax ID derived from Recurly, so we remove any existing ones first. @@ -883,8 +642,8 @@ async function replaceCustomerTaxIds( let startingAfter while (true) { - const page = await stripeCall( - region, + const page = await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.listTaxIds(stripeCustomerId, { limit: 100, @@ -910,16 +669,16 @@ async function replaceCustomerTaxIds( ) for (const taxId of existingTaxIds) { - await stripeCall( - region, + await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.deleteTaxId(stripeCustomerId, taxId.id), { ...context, stripeApi: 'customers.deleteTaxId' } ) } } - return await stripeCall( - region, + return await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.createTaxId(stripeCustomerId, { type: taxIdType, @@ -1151,7 +910,6 @@ async function processCustomer( const existingCustomer = await fetchTargetStripeCustomer( stripeClient, stripeCustomerId, - region, stripeContext ) @@ -1236,8 +994,7 @@ async function processCustomer( stripeClient, stripeCustomerId, { taxIdType, vatNumber }, - context, - region + context ) logDebug( 'Successfully created tax ID', @@ -1426,8 +1183,8 @@ async function processCustomer( }, { verboseOnly: true } ) - await stripeCall( - region, + await rateLimiters.requestWithRetries( + stripeClient.serviceName, () => stripeClient.customers.update(stripeCustomerId, customerParams), { ...stripeContext, stripeApi: 'customers.update' } ) @@ -1694,6 +1451,7 @@ async function main(trackProgress) { let recurlyRateLimit let recurlyApiRetriesValue let recurlyRetryDelayMsValue + let stripeRateLimitPerSecond let stripeApiRetriesValue let stripeRetryDelayMsValue let limit @@ -1733,16 +1491,17 @@ async function main(trackProgress) { process.exit(1) } - recurlyRateLimiter = new RateLimiter( - 'Recurly', + // initialize rate limiters + rateLimiters = createRateLimitedApiWrappers({ recurlyRateLimit, - RATE_LIMIT_WINDOW_MS - ) - recurlyApiRetries = recurlyApiRetriesValue - recurlyRetryDelayMs = recurlyRetryDelayMsValue - stripeApiRetries = stripeApiRetriesValue - stripeRetryDelayMs = stripeRetryDelayMsValue - stripeRateLimiters.clear() + recurlyApiRetries: recurlyApiRetriesValue, + recurlyRetryDelayMs: recurlyRetryDelayMsValue, + stripeRateLimit: stripeRateLimitPerSecond, + stripeApiRetries: stripeApiRetriesValue, + stripeRetryDelayMs: stripeRetryDelayMsValue, + logDebug, + logWarn, + }) // Set DEBUG_MODE only from CLI arg (--verbose/-v) DEBUG_MODE = !!verbose @@ -1768,11 +1527,11 @@ async function main(trackProgress) { stripeExistingFieldsJsonPath, concurrency, recurlyRateLimit, - recurlyApiRetries, - recurlyRetryDelayMs, + recurlyApiRetries: recurlyApiRetriesValue, + recurlyRetryDelayMs: recurlyRetryDelayMsValue, stripeRateLimit: stripeRateLimitPerSecond, - stripeApiRetries, - stripeRetryDelayMs, + stripeApiRetries: stripeApiRetriesValue, + stripeRetryDelayMs: stripeRetryDelayMsValue, forceInvalidTax, ...(limit != null ? { limit } : {}), }) @@ -1981,7 +1740,7 @@ async function main(trackProgress) { // Progress update every 1000 customers (or 100 in debug mode) const progressInterval = DEBUG_MODE ? 100 : 1000 if (processedThisRun % progressInterval === 0) { - const rateLimiterStats = getRateLimiterStats() + const rateLimiterStats = rateLimiters.getRateLimiterStats() const progress = { rowNumber: lastCompletedRowNumber, processedThisRun, @@ -2039,7 +1798,7 @@ async function main(trackProgress) { const totalSuccessful = commit ? previouslyProcessed.size + updatedCount : previouslyProcessed.size - const finalRateLimiterStats = getRateLimiterStats() + const finalRateLimiterStats = rateLimiters.getRateLimiterStats() await trackProgress('=== FINAL SUMMARY ===') await trackProgress(`Start time: ${startTime.toISOString()}`) @@ -2054,11 +1813,13 @@ async function main(trackProgress) { await trackProgress(` - limit: ${limit != null ? limit : 'none'}`) await trackProgress(` - concurrency: ${concurrency}`) await trackProgress(` - recurly-rate-limit: ${recurlyRateLimit}`) - await trackProgress(` - recurly-api-retries: ${recurlyApiRetries}`) - await trackProgress(` - recurly-retry-delay-ms: ${recurlyRetryDelayMs}`) + await trackProgress(` - recurly-api-retries: ${recurlyApiRetriesValue}`) + await trackProgress( + ` - recurly-retry-delay-ms: ${recurlyRetryDelayMsValue}` + ) await trackProgress(` - stripe-rate-limit: ${stripeRateLimitPerSecond}`) - await trackProgress(` - stripe-api-retries: ${stripeApiRetries}`) - await trackProgress(` - stripe-retry-delay-ms: ${stripeRetryDelayMs}`) + await trackProgress(` - stripe-api-retries: ${stripeApiRetriesValue}`) + await trackProgress(` - stripe-retry-delay-ms: ${stripeRetryDelayMsValue}`) await trackProgress(` - force-invalid-tax: ${forceInvalidTax}`) await trackProgress(`Input file total rows: ${totalInInput}`) await trackProgress( diff --git a/services/web/scripts/stripe/RateLimiter.mjs b/services/web/scripts/stripe/RateLimiter.mjs new file mode 100644 index 0000000000..12a232ceeb --- /dev/null +++ b/services/web/scripts/stripe/RateLimiter.mjs @@ -0,0 +1,310 @@ +/* eslint-disable @overleaf/require-script-runner */ +// This file contains helper functions used by other scripts. +// The scripts that import these helpers should use Script Runner. + +import { setTimeout } from 'node:timers/promises' + +export const DEFAULT_RECURLY_RATE_LIMIT = 10 +export const DEFAULT_STRIPE_RATE_LIMIT = 50 +export const DEFAULT_RECURLY_API_RETRIES = 5 +export const DEFAULT_RECURLY_RETRY_DELAY_MS = 1000 +export const DEFAULT_STRIPE_API_RETRIES = 5 +export const DEFAULT_STRIPE_RETRY_DELAY_MS = 1000 + +/** + * Rate limiter using sliding window algorithm. + * + * Rate limits (conservative targets, leaving headroom): + * - Recurly: 2000 requests per 5 minutes → target 1500/5min = 300/min = 5/sec + * https://support.recurly.com/hc/en-us/articles/360034160731-What-Are-Recurly-s-API-Rate-Limits + * - Stripe: 100 requests per second → target 50/sec (plenty of headroom) + * https://docs.stripe.com/rate-limits + * + * Recurly is the bottleneck. With 2 Recurly calls per customer (getAccount, getBillingInfo), + * we can process ~2.5 customers/second = ~150 customers/minute = ~9000 customers/hour. + * For 150K customers, expect ~17 hours at full throughput. + */ + +class RateLimiter { + /** + * @param {string} name - Name for logging + * @param {number} maxRequests - Maximum requests allowed in the window + * @param {number} windowMs - Window size in milliseconds + * @param {Function} logDebug - Optional debug logging function + * @param {Function} logWarn - Optional warning logging function + */ + constructor( + name, + maxRequests, + windowMs, + logDebug = () => null, + logWarn = () => null + ) { + this.name = name + this.maxRequests = maxRequests + this.windowMs = windowMs + this.requests = [] // timestamps of recent requests + this.totalRequests = 0 + this._pending = Promise.resolve() + this.logDebug = logDebug + this.logWarn = logWarn + } + + /** + * Wait if necessary to stay within rate limits, then record the request. + */ + async throttle() { + this._pending = this._pending + .catch(error => { + // this should never happen since setTimeout or logDebug are very unlikely to ever fail + // but if it does, we log it and continue without blocking the queue (fail-open) + this.logWarn(`Rate limiter chain error for ${this.name}`, { + error: error?.message || String(error), + }) + }) + .then(async () => { + while (true) { + const now = Date.now() + + // Remove requests outside the window + const windowStart = now - this.windowMs + this.requests = this.requests.filter(ts => ts > windowStart) + + // If at limit, wait until the oldest request exits the window + if (this.requests.length >= this.maxRequests) { + const oldestRequest = this.requests[0] + const waitTime = oldestRequest - windowStart + 1 + if (waitTime > 0) { + this.logDebug( + `Rate limit throttle for ${this.name}`, + { + waitMs: waitTime, + currentRequests: this.requests.length, + maxRequests: this.maxRequests, + }, + { verboseOnly: true } + ) + await setTimeout(waitTime) + continue + } + } + + // Record this request + this.requests.push(Date.now()) + this.totalRequests++ + break + } + }) + + return this._pending + } + + /** + * Get current rate (requests per second over the last window) + */ + getCurrentRate() { + const now = Date.now() + const windowStart = now - this.windowMs + const recentRequests = this.requests.filter(ts => ts > windowStart).length + return (recentRequests / this.windowMs) * 1000 // requests per second + } + + getStats() { + return { + name: this.name, + totalRequests: this.totalRequests, + currentWindowRequests: this.requests.length, + maxRequests: this.maxRequests, + currentRate: this.getCurrentRate().toFixed(2) + '/sec', + } + } +} + +/** + * Helper to extract Stripe rate limit reason from error headers + */ +function getStripeRateLimitReason(error) { + const headers = + error?.headers || error?.raw?.headers || error?.response?.headers || {} + return ( + headers['stripe-rate-limit-reason'] || + headers['Stripe-Rate-Limited-Reason'] || + headers['stripe-rate-limited-reason'] || + null + ) +} + +/** + * Create rate-limited API wrapper with unified service routing. + * + * @param {object} config - Configuration options + * @param {number} config.recurlyRateLimit - Requests per second for Recurly (default: 10) + * @param {number} config.recurlyApiRetries - Number of retries on Recurly 429s (default: 5) + * @param {number} config.recurlyRetryDelayMs - Delay between Recurly retries in ms (default: 1000) + * @param {number} config.stripeRateLimit - Requests per second for Stripe (default: 50) + * @param {number} config.stripeApiRetries - Number of retries on Stripe 429s (default: 5) + * @param {number} config.stripeRetryDelayMs - Delay between Stripe retries in ms (default: 1000) + * @param {Function} config.logDebug - Optional debug logging function + * @param {Function} config.logWarn - Optional warning logging function + * + * @returns {object} Object with unified call function and stats getter + * @returns {Function} returns.call - Unified wrapper for API calls (service, operation, context) + * @returns {Function} returns.getRateLimiterStats - Get current rate limiter statistics + */ +export function createRateLimitedApiWrappers(config = {}) { + const { + recurlyRateLimit = 10, + recurlyApiRetries = 5, + recurlyRetryDelayMs = 1000, + stripeRateLimit = 50, + stripeApiRetries = 5, + stripeRetryDelayMs = 1000, + logDebug = () => null, + logWarn = () => null, + } = config + + const RATE_LIMIT_WINDOW_MS = 1000 + + // Service configuration registry + const serviceConfigs = { + recurly: { + rateLimit: recurlyRateLimit, + apiRetries: recurlyApiRetries, + retryDelayMs: recurlyRetryDelayMs, + isStripe: false, + }, + stripe: { + rateLimit: stripeRateLimit, + apiRetries: stripeApiRetries, + retryDelayMs: stripeRetryDelayMs, + isStripe: true, + }, + } + + // Rate limiter instances per service + const rateLimiters = new Map() + + function getRateLimiter(service) { + const key = String(service || 'unknown').toLowerCase() + if (rateLimiters.has(key)) { + return rateLimiters.get(key) + } + + // Determine service config + let serviceConfig + if (key === 'recurly') { + serviceConfig = serviceConfigs.recurly + } else if (key.startsWith('stripe')) { + serviceConfig = serviceConfigs.stripe + } else { + throw new Error(`Unknown service: ${service}`) + } + + const limiter = new RateLimiter( + key, + serviceConfig.rateLimit, + RATE_LIMIT_WINDOW_MS, + logDebug, + logWarn + ) + rateLimiters.set(key, limiter) + return limiter + } + + function getServiceConfig(service) { + const key = String(service || 'unknown').toLowerCase() + if (key === 'recurly') { + return serviceConfigs.recurly + } else if (key.startsWith('stripe')) { + return serviceConfigs.stripe + } else { + throw new Error(`Unknown service: ${service}`) + } + } + + async function requestWithRetries(service, operation, { context } = {}) { + const serviceConfig = getServiceConfig(service) + const rateLimiter = getRateLimiter(service) + let attempt = 0 + + while (true) { + try { + await rateLimiter.throttle() + return await operation() + } catch (error) { + const statusCode = + error?.statusCode ?? error?.status ?? error?.raw?.statusCode + if (statusCode === 429) { + attempt++ + if (attempt > serviceConfig.apiRetries) { + logWarn( + `${service} rate limit exceeded after ${attempt - 1} retries`, + { + ...context, + service, + attempt, + ...(serviceConfig.isStripe + ? { rateLimitReason: getStripeRateLimitReason(error) } + : {}), + } + ) + throw error + } + logDebug(`${service} rate limited, retrying`, { + ...context, + service, + attempt, + retryDelayMs: serviceConfig.retryDelayMs, + ...(serviceConfig.isStripe + ? { rateLimitReason: getStripeRateLimitReason(error) } + : {}), + }) + await setTimeout(serviceConfig.retryDelayMs) + continue + } + throw error + } + } + } + + /** + * Get rate limiter statistics for logging + */ + function getRateLimiterStats() { + const allLimiters = [...rateLimiters.values()] + + // Separate Recurly and Stripe limiters + const recurlyLimiters = allLimiters.filter( + limiter => limiter.name === 'recurly' + ) + const stripeLimiters = allLimiters.filter(limiter => + limiter.name.startsWith('stripe') + ) + + const stripeTotalRequests = stripeLimiters.reduce( + (sum, limiter) => sum + limiter.totalRequests, + 0 + ) + const stripeCurrentRate = stripeLimiters.reduce( + (sum, limiter) => sum + limiter.getCurrentRate(), + 0 + ) + + return { + recurly: + recurlyLimiters.length > 0 + ? recurlyLimiters[0].getStats() + : { totalRequests: 0, currentRate: '0.00/sec' }, + stripe: { + totalRequests: stripeTotalRequests, + currentRate: stripeCurrentRate.toFixed(2) + '/sec', + }, + stripeByRegion: stripeLimiters.map(limiter => limiter.getStats()), + } + } + + return { + requestWithRetries, + getRateLimiterStats, + } +} diff --git a/services/web/scripts/stripe/finalize-stripe-subscription-migration.mjs b/services/web/scripts/stripe/finalize-stripe-subscription-migration.mjs index 980c7e257b..2560b2b245 100755 --- a/services/web/scripts/stripe/finalize-stripe-subscription-migration.mjs +++ b/services/web/scripts/stripe/finalize-stripe-subscription-migration.mjs @@ -14,10 +14,16 @@ * node scripts/stripe/finalize-stripe-subscription-migration.mjs [OPTS] [INPUT-FILE] * * Options: - * --output PATH Output file path (default: /tmp/migrate_output_.csv) - * --commit Apply changes (without this, runs in dry-run mode) - * --throttle DURATION Minimum time between requests in ms (default: 40) - * --help Show help message + * --output PATH Output file path (default: /tmp/migrate_output_.csv) + * --commit Apply changes (without this, runs in dry-run mode) + * --concurrency, -c Number of customers to process concurrently (default: 10) + * --recurly-rate-limit N Requests per second for Recurly (default: 10) + * --recurly-api-retries N Number of retries on Recurly 429s (default: 5) + * --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: 1000) + * --stripe-rate-limit N Requests per second for Stripe (default: 50) + * --stripe-api-retries N Number of retries on Stripe 429s (default: 5) + * --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: 1000) + * --help Show help message * * CSV Input Format: * recurly_account_code,target_stripe_account,stripe_customer_id @@ -31,9 +37,9 @@ import fs from 'node:fs' import path from 'node:path' -import { setTimeout } from 'node:timers/promises' import * as csv from 'csv' import minimist from 'minimist' +import PQueue from 'p-queue' import { z } from '../../app/src/infrastructure/Validation.mjs' import { scriptRunner } from '../lib/ScriptRunner.mjs' import { @@ -49,19 +55,35 @@ import UserAnalyticsIdCache from '../../app/src/Features/Analytics/UserAnalytics import CustomerIoHandler from '../../modules/customer-io/app/src/CustomerIoHandler.mjs' import { ReportError } from './helpers.mjs' import isEqual from 'lodash/isEqual.js' - -const DEFAULT_THROTTLE = 40 +import { + createRateLimitedApiWrappers, + DEFAULT_RECURLY_RATE_LIMIT, + DEFAULT_STRIPE_RATE_LIMIT, + DEFAULT_RECURLY_API_RETRIES, + DEFAULT_RECURLY_RETRY_DELAY_MS, + DEFAULT_STRIPE_API_RETRIES, + DEFAULT_STRIPE_RETRY_DELAY_MS, +} from './RateLimiter.mjs' const preloadedProductMetadata = new Map() +// rate limiters - initialized in main() +let rateLimiters + function usage() { console.error(`Usage: node scripts/stripe/finalize-stripe-subscription-migration.mjs [OPTS] [INPUT-FILE] Options: - --output PATH Output file path (default: /tmp/migrate_output_.csv) - --commit Apply changes (without this, runs in dry-run mode) - --throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE}) - --help Show this help message + --output PATH Output file path (default: /tmp/migrate_output_.csv) + --commit Apply changes (without this, runs in dry-run mode) + --concurrency N Number of customers to process concurrently (default: 10) + --recurly-rate-limit N Requests per second for Recurly (default: ${DEFAULT_RECURLY_RATE_LIMIT}) + --recurly-api-retries N Number of retries on Recurly 429s (default: ${DEFAULT_RECURLY_API_RETRIES}) + --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: ${DEFAULT_RECURLY_RETRY_DELAY_MS}) + --stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT}) + --stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES}) + --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS}) + --help Show this help message `) } @@ -70,9 +92,22 @@ async function main(trackProgress) { const timestamp = new Date().toISOString().replace(/[:.]/g, '-') const outputFile = opts.output ?? `/tmp/migrate_output_${timestamp}.csv` + // initialize rate limiters + rateLimiters = createRateLimitedApiWrappers({ + recurlyRateLimit: opts.recurlyRateLimit, + recurlyApiRetries: opts.recurlyApiRetries, + recurlyRetryDelayMs: opts.recurlyRetryDelayMs, + stripeRateLimit: opts.stripeRateLimit, + stripeApiRetries: opts.stripeApiRetries, + stripeRetryDelayMs: opts.stripeRetryDelayMs, + }) + await trackProgress('Starting Recurly to Stripe migration cutover') await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`) - await trackProgress(`Throttle: ${opts.throttle}ms between requests`) + await trackProgress( + `Rate limits: Recurly ${opts.recurlyRateLimit}/s, Stripe ${opts.stripeRateLimit}/s` + ) + await trackProgress(`Concurrency: ${opts.concurrency}`) const inputStream = opts.inputFile ? fs.createReadStream(opts.inputFile) @@ -91,74 +126,82 @@ async function main(trackProgress) { let successCount = 0 let errorCount = 0 - let lastLoopTimestamp = 0 - for await (const input of csvReader) { - const timeSinceLastLoop = Date.now() - lastLoopTimestamp - if (timeSinceLastLoop < opts.throttle) { - await setTimeout(opts.throttle - timeSinceLastLoop) - } - lastLoopTimestamp = Date.now() + const queue = new PQueue({ concurrency: opts.concurrency }) + const maxQueueSize = opts.concurrency - processedCount++ + try { + for await (const input of csvReader) { + // throttle input if queue is full + if (queue.size >= maxQueueSize) { + await queue.onSizeLessThan(maxQueueSize) + } - try { - const result = await processMigration(input, opts.commit) + queue.add(async () => { + processedCount++ - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - previous_recurly_status: result.previousRecurlyStatus || '', - previous_recurly_subscription_id: - result.previousRecurlySubscriptionId || '', - email: result.email || '', - analyticsId: result.analyticsId || '', - status: result.status, - note: result.note, + try { + const result = await processMigration(input, opts.commit) + + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + previous_recurly_status: result.previousRecurlyStatus || '', + previous_recurly_subscription_id: + result.previousRecurlySubscriptionId || '', + email: result.email || '', + analyticsId: result.analyticsId || '', + status: result.status, + note: result.note, + }) + + if ( + result.status.startsWith('migrated') || + result.status === 'validated' + ) { + successCount++ + } else { + errorCount++ + } + + if (processedCount % 25 === 0) { + await trackProgress( + `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` + ) + } + } catch (err) { + errorCount++ + if (err instanceof ReportError) { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + previous_recurly_status: '', + previous_recurly_subscription_id: '', + email: '', + analyticsId: '', + status: err.status, + note: err.message, + }) + } else { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + previous_recurly_status: '', + previous_recurly_subscription_id: '', + email: '', + analyticsId: '', + status: 'error', + note: err.message, + }) + } + } }) - - if ( - result.status.startsWith('migrated') || - result.status === 'validated' - ) { - successCount++ - } else { - errorCount++ - } - - if (processedCount % 25 === 0) { - await trackProgress( - `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` - ) - } - } catch (err) { - errorCount++ - if (err instanceof ReportError) { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - previous_recurly_status: '', - previous_recurly_subscription_id: '', - email: '', - analyticsId: '', - status: err.status, - note: err.message, - }) - } else { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - previous_recurly_status: '', - previous_recurly_subscription_id: '', - email: '', - analyticsId: '', - status: 'error', - note: err.message, - }) - } } + } finally { + // wait for all queued tasks to complete + await queue.onIdle() } await trackProgress(`✅ Total processed: ${processedCount}`) @@ -213,10 +256,15 @@ async function preloadProductMetadata(region) { if (preloadedProductMetadata.has(region)) return const stripeClient = getRegionClient(region) - const products = await stripeClient.stripe.products.list({ - active: true, - limit: 100, - }) + const products = await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => + stripeClient.stripe.products.list({ + active: true, + limit: 100, + }), + { operation: 'products.list', region: stripeClient.serviceName } + ) const results = new Map() for (const product of products.data) { @@ -264,9 +312,15 @@ async function processMigration(input, commit) { let stripeCustomer let stripeSubscription try { - stripeCustomer = await stripeClient.getCustomerById(stripeCustomerId, [ - 'subscriptions', - ]) + stripeCustomer = await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => stripeClient.getCustomerById(stripeCustomerId, ['subscriptions']), + { + operation: 'getCustomerById', + stripeCustomerId, + region: stripeClient.serviceName, + } + ) if ( !stripeCustomer.subscriptions || stripeCustomer.subscriptions.data.length === 0 @@ -297,9 +351,17 @@ async function processMigration(input, commit) { // 5. Fetch Recurly subscription let recurlySubscription try { - recurlySubscription = await RecurlyWrapper.promises.getSubscription( - previousRecurlySubscriptionId, - {} + recurlySubscription = await rateLimiters.requestWithRetries( + 'recurly', + () => + RecurlyWrapper.promises.getSubscription( + previousRecurlySubscriptionId, + {} + ), + { + operation: 'getSubscription', + recurlySubscriptionId: previousRecurlySubscriptionId, + } ) } catch (err) { throw new ReportError( @@ -493,11 +555,19 @@ async function performCutover( postponedDate.setFullYear(currentBillingDate.getFullYear() + 10) try { - await RecurlyWrapper.promises.apiRequest({ - url: `subscriptions/${recurlySubscription.uuid}/postpone`, - qs: { bulk: true, next_bill_date: postponedDate }, - method: 'PUT', - }) + await rateLimiters.requestWithRetries( + 'recurly', + () => + RecurlyWrapper.promises.apiRequest({ + url: `subscriptions/${recurlySubscription.uuid}/postpone`, + qs: { bulk: true, next_bill_date: postponedDate }, + method: 'PUT', + }), + { + operation: 'postpone', + recurlySubscriptionId: recurlySubscription.uuid, + } + ) } catch (err) { throw new ReportError( 'migrated-recurly-postpone-failed', @@ -508,9 +578,18 @@ async function performCutover( // Step 4: Remove migration metadata from Stripe try { - await stripeClient.updateSubscriptionMetadata(stripeSubscription.id, { - recurly_to_stripe_migration_status: '', - }) + await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => + stripeClient.updateSubscriptionMetadata(stripeSubscription.id, { + recurly_to_stripe_migration_status: '', + }), + { + operation: 'updateSubscriptionMetadata', + stripeSubscriptionId: stripeSubscription.id, + region: stripeClient.serviceName, + } + ) } catch (err) { throw new ReportError( 'migrated-metadata-removal-failed', @@ -534,26 +613,7 @@ async function performCutover( ) } - // Step 6. Remap customer metadata (if needed) in Stripe - if ( - stripeCustomer.metadata != null && - stripeCustomer.metadata.recurlyAccountCode != null && - stripeCustomer.metadata.userId == null - ) { - try { - await stripeClient.updateCustomerMetadata(stripeCustomer.id, { - recurlyAccountCode: '', - userId: adminUserId, - }) - } catch (err) { - throw new ReportError( - 'migrated-customer-metadata-removal-failed', - `Successfully migrated to Stripe and registered analytics mapping but failed to remove customer metadata: ${err.message}` - ) - } - } - - // Step 7. Send data to customer.io + // Step 6. Send data to customer.io if (analyticsId) { try { const migrationDate = new Date().toISOString().slice(0, 10) @@ -580,10 +640,27 @@ async function performCutover( function parseArgs() { const args = minimist(process.argv.slice(2), { - string: ['output'], - number: ['throttle'], + string: [ + 'output', + 'concurrency', + 'recurly-rate-limit', + 'recurly-api-retries', + 'recurly-retry-delay-ms', + 'stripe-rate-limit', + 'stripe-api-retries', + 'stripe-retry-delay-ms', + ], boolean: ['commit', 'help'], - default: { commit: false, throttle: DEFAULT_THROTTLE }, + default: { + commit: false, + concurrency: 10, + 'recurly-rate-limit': DEFAULT_RECURLY_RATE_LIMIT, + 'recurly-api-retries': DEFAULT_RECURLY_API_RETRIES, + 'recurly-retry-delay-ms': DEFAULT_RECURLY_RETRY_DELAY_MS, + 'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT, + 'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES, + 'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS, + }, }) if (args.help) { @@ -595,7 +672,13 @@ function parseArgs() { const paramsSchema = z.object({ output: z.string().optional(), commit: z.boolean(), - throttle: z.number().int().positive(), + concurrency: z.number().int().positive(), + recurlyRateLimit: z.number().positive(), + recurlyApiRetries: z.number().int().nonnegative(), + recurlyRetryDelayMs: z.number().int().nonnegative(), + stripeRateLimit: z.number().positive(), + stripeApiRetries: z.number().int().nonnegative(), + stripeRetryDelayMs: z.number().int().nonnegative(), inputFile: z.string().optional(), }) @@ -603,7 +686,13 @@ function parseArgs() { return paramsSchema.parse({ output: args.output, commit: args.commit, - throttle: args.throttle, + concurrency: Number(args.concurrency), + recurlyRateLimit: Number(args['recurly-rate-limit']), + recurlyApiRetries: Number(args['recurly-api-retries']), + recurlyRetryDelayMs: Number(args['recurly-retry-delay-ms']), + stripeRateLimit: Number(args['stripe-rate-limit']), + stripeApiRetries: Number(args['stripe-api-retries']), + stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']), inputFile, }) } catch (err) { diff --git a/services/web/scripts/stripe/rollback-finalized-stripe-migration.mjs b/services/web/scripts/stripe/rollback-finalized-stripe-migration.mjs index 4c390141b7..b14d6db562 100755 --- a/services/web/scripts/stripe/rollback-finalized-stripe-migration.mjs +++ b/services/web/scripts/stripe/rollback-finalized-stripe-migration.mjs @@ -29,9 +29,9 @@ import fs from 'node:fs' import path from 'node:path' -import { setTimeout } from 'node:timers/promises' import * as csv from 'csv' import minimist from 'minimist' +import PQueue from 'p-queue' import { z } from '../../app/src/infrastructure/Validation.mjs' import { scriptRunner } from '../lib/ScriptRunner.mjs' import { getRegionClient } from '../../modules/subscriptions/app/src/StripeClient.mjs' @@ -42,17 +42,33 @@ import UserAnalyticsIdCache from '../../app/src/Features/Analytics/UserAnalytics import CustomerIoHandler from '../../modules/customer-io/app/src/CustomerIoHandler.mjs' import { ReportError } from './helpers.mjs' import AccountMappingHelper from '../../app/src/Features/Analytics/AccountMappingHelper.mjs' +import { + createRateLimitedApiWrappers, + DEFAULT_RECURLY_RATE_LIMIT, + DEFAULT_STRIPE_RATE_LIMIT, + DEFAULT_RECURLY_API_RETRIES, + DEFAULT_RECURLY_RETRY_DELAY_MS, + DEFAULT_STRIPE_API_RETRIES, + DEFAULT_STRIPE_RETRY_DELAY_MS, +} from './RateLimiter.mjs' -const DEFAULT_THROTTLE = 40 +// rate limiters - initialized in main() +let rateLimiters function usage() { console.error(`Usage: node scripts/stripe/rollback-finalized-stripe-migration.mjs [OPTS] [INPUT-FILE] Options: - --output PATH Output file path (default: /tmp/rollback_output_.csv) - --commit Apply changes (without this, runs in dry-run mode) - --throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE}) - --help Show this help message + --output PATH Output file path (default: /tmp/rollback_output_.csv) + --commit Apply changes (without this, runs in dry-run mode) + --concurrency N Number of rollbacks to process concurrently (default: 10) + --recurly-rate-limit N Requests per second for Recurly (default: ${DEFAULT_RECURLY_RATE_LIMIT}) + --recurly-api-retries N Number of retries on Recurly 429s (default: ${DEFAULT_RECURLY_API_RETRIES}) + --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: ${DEFAULT_RECURLY_RETRY_DELAY_MS}) + --stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT}) + --stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES}) + --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS}) + --help Show this help message Note: This script does NOT cancel Stripe subscriptions. Use scripts/stripe/bulk-cancel-subscriptions.mjs separately. `) @@ -63,12 +79,25 @@ async function main(trackProgress) { const timestamp = new Date().toISOString().replace(/[:.]/g, '-') const outputFile = opts.output ?? `/tmp/rollback_output_${timestamp}.csv` + // initialize rate limiters + rateLimiters = createRateLimitedApiWrappers({ + recurlyRateLimit: opts.recurlyRateLimit, + recurlyApiRetries: opts.recurlyApiRetries, + recurlyRetryDelayMs: opts.recurlyRetryDelayMs, + stripeRateLimit: opts.stripeRateLimit, + stripeApiRetries: opts.stripeApiRetries, + stripeRetryDelayMs: opts.stripeRetryDelayMs, + }) + await trackProgress('Starting Stripe to Recurly rollback') await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`) await trackProgress( 'Note: Stripe subscriptions are NOT cancelled by this script' ) - await trackProgress(`Throttle: ${opts.throttle}ms between requests`) + await trackProgress( + `Rate limits: Recurly ${opts.recurlyRateLimit}/s, Stripe ${opts.stripeRateLimit}/s` + ) + await trackProgress(`Concurrency: ${opts.concurrency}`) const inputStream = opts.inputFile ? fs.createReadStream(opts.inputFile) @@ -82,62 +111,70 @@ async function main(trackProgress) { let successCount = 0 let errorCount = 0 - let lastLoopTimestamp = 0 - for await (const input of csvReader) { - const timeSinceLastLoop = Date.now() - lastLoopTimestamp - if (timeSinceLastLoop < opts.throttle) { - await setTimeout(opts.throttle - timeSinceLastLoop) - } - lastLoopTimestamp = Date.now() + const queue = new PQueue({ concurrency: opts.concurrency }) + const maxQueueSize = opts.concurrency - processedCount++ + try { + for await (const input of csvReader) { + // throttle input if queue is full + if (queue.size >= maxQueueSize) { + await queue.onSizeLessThan(maxQueueSize) + } - try { - const result = await processRollback(input, opts.commit) + queue.add(async () => { + processedCount++ - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - status: result.status, - note: result.note, + try { + const result = await processRollback(input, opts.commit) + + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + status: result.status, + note: result.note, + }) + + if ( + result.status === 'rolled-back' || + result.status === 'validated' || + result.status === 'already-recurly' + ) { + successCount++ + } else { + errorCount++ + } + + if (processedCount % 25 === 0) { + await trackProgress( + `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` + ) + } + } catch (err) { + errorCount++ + if (err instanceof ReportError) { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + status: err.status, + note: err.message, + }) + } else { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + target_stripe_account: input.target_stripe_account, + stripe_customer_id: input.stripe_customer_id, + status: 'error', + note: err.message, + }) + } + } }) - - if ( - result.status === 'rolled-back' || - result.status === 'validated' || - result.status === 'already-recurly' - ) { - successCount++ - } else { - errorCount++ - } - - if (processedCount % 25 === 0) { - await trackProgress( - `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` - ) - } - } catch (err) { - errorCount++ - if (err instanceof ReportError) { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - status: err.status, - note: err.message, - }) - } else { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - target_stripe_account: input.target_stripe_account, - stripe_customer_id: input.stripe_customer_id, - status: 'error', - note: err.message, - }) - } } + } finally { + // wait for all queued tasks to complete + await queue.onIdle() } await trackProgress(`✅ Total processed: ${processedCount}`) @@ -229,8 +266,15 @@ async function processRollback(input, commit) { // 4. Find Recurly subscription ID from Stripe metadata let recurlySubscriptionId try { - const stripeSubData = - await stripeClient.stripe.subscriptions.retrieve(stripeSubscriptionId) + const stripeSubData = await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => stripeClient.stripe.subscriptions.retrieve(stripeSubscriptionId), + { + operation: 'subscriptions.retrieve', + stripeSubscriptionId, + region: stripeClient.serviceName, + } + ) recurlySubscriptionId = stripeSubData.metadata?.recurly_subscription_id if (!recurlySubscriptionId) { throw new ReportError( @@ -249,9 +293,13 @@ async function processRollback(input, commit) { // 5. Fetch Recurly subscription to get original billing date let recurlySubscription try { - recurlySubscription = await RecurlyWrapper.promises.getSubscription( - recurlySubscriptionId, - {} + recurlySubscription = await rateLimiters.requestWithRetries( + 'recurly', + () => RecurlyWrapper.promises.getSubscription(recurlySubscriptionId, {}), + { + operation: 'getSubscription', + recurlySubscriptionId, + } ) } catch (err) { throw new ReportError( @@ -316,11 +364,19 @@ async function performRollback( if (targetBillingDateIsInFuture) { try { - await RecurlyWrapper.promises.apiRequest({ - url: `subscriptions/${recurlySubscriptionId}/postpone`, - qs: { bulk: true, next_bill_date: nextBillingDate }, - method: 'PUT', - }) + await rateLimiters.requestWithRetries( + 'recurly', + () => + RecurlyWrapper.promises.apiRequest({ + url: `subscriptions/${recurlySubscriptionId}/postpone`, + qs: { bulk: true, next_bill_date: nextBillingDate }, + method: 'PUT', + }), + { + operation: 'postpone', + recurlySubscriptionId, + } + ) } catch (err) { throw new ReportError( 'rolled-back-recurly-restore-failed', @@ -337,9 +393,18 @@ async function performRollback( // Step 4: Restore migration metadata to Stripe try { - await stripeClient.updateSubscriptionMetadata(stripeSubscriptionId, { - recurly_to_stripe_migration_status: 'in_progress', - }) + await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => + stripeClient.updateSubscriptionMetadata(stripeSubscriptionId, { + recurly_to_stripe_migration_status: 'in_progress', + }), + { + operation: 'updateSubscriptionMetadata', + stripeSubscriptionId, + region: stripeClient.serviceName, + } + ) } catch (err) { throw new ReportError( 'rolled-back-metadata-restore-failed', @@ -381,12 +446,26 @@ async function performRollback( function parseArgs() { const args = minimist(process.argv.slice(2), { - string: ['output'], - number: ['throttle'], + string: [ + 'output', + 'concurrency', + 'recurly-rate-limit', + 'recurly-api-retries', + 'recurly-retry-delay-ms', + 'stripe-rate-limit', + 'stripe-api-retries', + 'stripe-retry-delay-ms', + ], boolean: ['commit', 'help'], default: { commit: false, - throttle: DEFAULT_THROTTLE, + concurrency: 10, + 'recurly-rate-limit': DEFAULT_RECURLY_RATE_LIMIT, + 'recurly-api-retries': DEFAULT_RECURLY_API_RETRIES, + 'recurly-retry-delay-ms': DEFAULT_RECURLY_RETRY_DELAY_MS, + 'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT, + 'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES, + 'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS, }, }) @@ -399,7 +478,13 @@ function parseArgs() { const paramsSchema = z.object({ output: z.string().optional(), commit: z.boolean(), - throttle: z.number().int().positive(), + concurrency: z.number().int().positive(), + recurlyRateLimit: z.number().positive(), + recurlyApiRetries: z.number().int().nonnegative(), + recurlyRetryDelayMs: z.number().int().nonnegative(), + stripeRateLimit: z.number().positive(), + stripeApiRetries: z.number().int().nonnegative(), + stripeRetryDelayMs: z.number().int().nonnegative(), inputFile: z.string().optional(), }) @@ -407,7 +492,13 @@ function parseArgs() { return paramsSchema.parse({ output: args.output, commit: args.commit, - throttle: args.throttle, + concurrency: Number(args.concurrency), + recurlyRateLimit: Number(args['recurly-rate-limit']), + recurlyApiRetries: Number(args['recurly-api-retries']), + recurlyRetryDelayMs: Number(args['recurly-retry-delay-ms']), + stripeRateLimit: Number(args['stripe-rate-limit']), + stripeApiRetries: Number(args['stripe-api-retries']), + stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']), inputFile, }) } catch (err) {