diff --git a/services/web/scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs b/services/web/scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs index 64280d4bc2..ccc4885c60 100755 --- a/services/web/scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs +++ b/services/web/scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs @@ -15,10 +15,13 @@ * node scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs [OPTS] [INPUT-FILE] * * Options: - * --output PATH Output file path (default: /tmp/cancel_output_.csv) - * --commit Apply changes (without this, runs in dry-run mode) - * --throttle DURATION Minimum time between requests in ms (default: 100) - * --help Show help message + * --output PATH Output file path (default: /tmp/cancel_output_.csv) + * --commit Apply changes (without this, runs in dry-run mode) + * --concurrency N Number of customers to process concurrently (default: 10) + * --recurly-rate-limit N Requests per second for Recurly (default: 10) + * --recurly-api-retries N Number of retries on Recurly 429s (default: 5) + * --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: 1000) + * --help Show help message * * CSV Input Format: * recurly_account_code,previous_recurly_subscription_id @@ -32,25 +35,37 @@ import fs from 'node:fs' import path from 'node:path' -import { setTimeout } from 'node:timers/promises' import * as csv from 'csv' import minimist from 'minimist' +import PQueue from 'p-queue' import RecurlyClient from '../../app/src/Features/Subscription/RecurlyClient.mjs' import { z } from '../../app/src/infrastructure/Validation.mjs' import { scriptRunner } from '../lib/ScriptRunner.mjs' import { Subscription } from '../../app/src/models/Subscription.mjs' import { ReportError } from '../stripe/helpers.mjs' +import { + createRateLimitedApiWrappers, + DEFAULT_RECURLY_RATE_LIMIT, + DEFAULT_RECURLY_API_RETRIES, + DEFAULT_RECURLY_RETRY_DELAY_MS, +} from '../stripe/RateLimiter.mjs' -const DEFAULT_THROTTLE = 100 +const DEFAULT_CONCURRENCY = 10 + +// rate limiters - initialized in main() +let rateLimiters function usage() { console.error(`Usage: node scripts/recurly/cleanup-recurly-subscriptions-post-migration.mjs [OPTS] [INPUT-FILE] Options: - --output PATH Output file path (default: /tmp/terminate_output_.csv) - --commit Apply changes (without this, runs in dry-run mode) - --throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE}) - --help Show this help message + --output PATH Output file path (default: /tmp/terminate_output_.csv) + --commit Apply changes (without this, runs in dry-run mode) + --concurrency N Number of customers to process concurrently (default: ${DEFAULT_CONCURRENCY}) + --recurly-rate-limit N Requests per second for Recurly (default: ${DEFAULT_RECURLY_RATE_LIMIT}) + --recurly-api-retries N Number of retries on Recurly 429s (default: ${DEFAULT_RECURLY_API_RETRIES}) + --recurly-retry-delay-ms N Delay between Recurly retries in ms (default: ${DEFAULT_RECURLY_RETRY_DELAY_MS}) + --help Show this help message `) } @@ -59,9 +74,17 @@ async function main(trackProgress) { const timestamp = new Date().toISOString().replace(/[:.]/g, '-') const outputFile = opts.output ?? `/tmp/terminate_output_${timestamp}.csv` + // initialize rate limiters + rateLimiters = createRateLimitedApiWrappers({ + recurlyRateLimit: opts.recurlyRateLimit, + recurlyApiRetries: opts.recurlyApiRetries, + recurlyRetryDelayMs: opts.recurlyRetryDelayMs, + }) + await trackProgress('Starting Recurly subscription termination') await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`) - await trackProgress(`Throttle: ${opts.throttle}ms between requests`) + await trackProgress(`Rate limit: Recurly ${opts.recurlyRateLimit}/s`) + await trackProgress(`Concurrency: ${opts.concurrency}`) const inputStream = opts.inputFile ? fs.createReadStream(opts.inputFile) @@ -75,58 +98,64 @@ async function main(trackProgress) { let successCount = 0 let errorCount = 0 - let lastLoopTimestamp = 0 - for await (const input of csvReader) { - const timeSinceLastLoop = Date.now() - lastLoopTimestamp - if (timeSinceLastLoop < opts.throttle) { - await setTimeout(opts.throttle - timeSinceLastLoop) - } - lastLoopTimestamp = Date.now() + const queue = new PQueue({ concurrency: opts.concurrency }) + const maxQueueSize = opts.concurrency - processedCount++ + try { + for await (const input of csvReader) { + if (queue.size >= maxQueueSize) { + await queue.onSizeLessThan(maxQueueSize) + } - try { - const result = await processTermination(input, opts.commit) + queue.add(async () => { + processedCount++ - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - status: result.status, - note: result.note, - previous_recurly_subscription_id: - input.previous_recurly_subscription_id, + try { + const result = await processTermination(input, opts.commit) + + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + status: result.status, + note: result.note, + previous_recurly_subscription_id: + input.previous_recurly_subscription_id, + }) + + if (result.status === 'terminated' || result.status === 'validated') { + successCount++ + } else { + errorCount++ + } + + if (processedCount % 25 === 0) { + await trackProgress( + `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` + ) + } + } catch (err) { + errorCount++ + if (err instanceof ReportError) { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + previous_recurly_subscription_id: + input.previous_recurly_subscription_id, + status: err.status, + note: err.message, + }) + } else { + csvWriter.write({ + recurly_account_code: input.recurly_account_code, + previous_recurly_subscription_id: + input.previous_recurly_subscription_id, + status: 'error', + note: err.message, + }) + } + } }) - - if (result.status === 'terminated' || result.status === 'validated') { - successCount++ - } else { - errorCount++ - } - - if (processedCount % 25 === 0) { - await trackProgress( - `Progress: ${processedCount} processed, ${successCount} successful, ${errorCount} errors` - ) - } - } catch (err) { - errorCount++ - if (err instanceof ReportError) { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - previous_recurly_subscription_id: - input.previous_recurly_subscription_id, - status: err.status, - note: err.message, - }) - } else { - csvWriter.write({ - recurly_account_code: input.recurly_account_code, - previous_recurly_subscription_id: - input.previous_recurly_subscription_id, - status: 'error', - note: err.message, - }) - } } + } finally { + await queue.onIdle() } await trackProgress(`✅ Total processed: ${processedCount}`) @@ -199,8 +228,11 @@ async function processTermination(input, commit) { let recurlySubscription let isInExpectedEndState = true try { - recurlySubscription = - await RecurlyClient.promises.getSubscription(subscriptionUuid) + recurlySubscription = await rateLimiters.requestWithRetries( + 'recurly', + () => RecurlyClient.promises.getSubscription(subscriptionUuid), + { operation: 'getSubscription', subscriptionUuid } + ) } catch (err) { isInExpectedEndState = false } @@ -228,7 +260,12 @@ async function processTermination(input, commit) { // 4. If commit mode, terminate the subscription if (commit) { try { - await RecurlyClient.promises.terminateSubscriptionByUuid(subscriptionUuid) + await rateLimiters.requestWithRetries( + 'recurly', + () => + RecurlyClient.promises.terminateSubscriptionByUuid(subscriptionUuid), + { operation: 'terminateSubscriptionByUuid', subscriptionUuid } + ) return { status: isInExpectedEndState ? 'terminated' @@ -255,10 +292,21 @@ async function processTermination(input, commit) { function parseArgs() { const args = minimist(process.argv.slice(2), { - string: ['output'], - number: ['throttle'], + string: [ + 'output', + 'concurrency', + 'recurly-rate-limit', + 'recurly-api-retries', + 'recurly-retry-delay-ms', + ], boolean: ['commit', 'help'], - default: { commit: false, throttle: DEFAULT_THROTTLE }, + default: { + commit: false, + concurrency: DEFAULT_CONCURRENCY, + 'recurly-rate-limit': DEFAULT_RECURLY_RATE_LIMIT, + 'recurly-api-retries': DEFAULT_RECURLY_API_RETRIES, + 'recurly-retry-delay-ms': DEFAULT_RECURLY_RETRY_DELAY_MS, + }, }) if (args.help) { @@ -270,7 +318,10 @@ function parseArgs() { const paramsSchema = z.object({ output: z.string().optional(), commit: z.boolean(), - throttle: z.number().int().positive(), + concurrency: z.number().int().positive(), + recurlyRateLimit: z.number().positive(), + recurlyApiRetries: z.number().int().nonnegative(), + recurlyRetryDelayMs: z.number().int().nonnegative(), inputFile: z.string().optional(), }) @@ -278,7 +329,10 @@ function parseArgs() { return paramsSchema.parse({ output: args.output, commit: args.commit, - throttle: args.throttle, + concurrency: Number(args.concurrency), + recurlyRateLimit: Number(args['recurly-rate-limit']), + recurlyApiRetries: Number(args['recurly-api-retries']), + recurlyRetryDelayMs: Number(args['recurly-retry-delay-ms']), inputFile, }) } catch (err) { diff --git a/services/web/scripts/stripe/bulk-cancel-subscriptions.mjs b/services/web/scripts/stripe/bulk-cancel-subscriptions.mjs index b25fd424f8..99052d9dfa 100755 --- a/services/web/scripts/stripe/bulk-cancel-subscriptions.mjs +++ b/services/web/scripts/stripe/bulk-cancel-subscriptions.mjs @@ -10,11 +10,14 @@ * node scripts/stripe/bulk-cancel-subscriptions.mjs [OPTS] [INPUT-FILE] * * Options: - * --output PATH Output file path (default: /tmp/bulk_cancel_output_.csv) - * Use '-' to write to stdout - * --commit Apply changes (without this flag, runs in dry-run mode) - * --throttle DURATION Minimum time (in ms) between subscriptions processed (default: 100) - * --help Show a help message + * --output PATH Output file path (default: /tmp/bulk_cancel_output_.csv) + * Use '-' to write to stdout + * --commit Apply changes (without this flag, runs in dry-run mode) + * --concurrency N Number of customers to process concurrently (default: 10) + * --stripe-rate-limit N Requests per second for Stripe (default: 50) + * --stripe-api-retries N Number of retries on Stripe 429s (default: 5) + * --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: 1000) + * --help Show a help message * * CSV Input Format: * The CSV must have the following columns: @@ -32,24 +35,37 @@ import fs from 'node:fs' import path from 'node:path' -import { setTimeout } from 'node:timers/promises' import * as csv from 'csv' import minimist from 'minimist' +import PQueue from 'p-queue' +import { z } from '../../app/src/infrastructure/Validation.mjs' import { scriptRunner } from '../lib/ScriptRunner.mjs' import { getRegionClient } from '../../modules/subscriptions/app/src/StripeClient.mjs' import { ReportError } from './helpers.mjs' +import { + createRateLimitedApiWrappers, + DEFAULT_STRIPE_RATE_LIMIT, + DEFAULT_STRIPE_API_RETRIES, + DEFAULT_STRIPE_RETRY_DELAY_MS, +} from './RateLimiter.mjs' -const DEFAULT_THROTTLE = 40 +const DEFAULT_CONCURRENCY = 10 + +// rate limiters - initialized in main() +let rateLimiters function usage() { console.error(`Usage: node scripts/stripe/bulk-cancel-subscriptions.mjs [OPTS] [INPUT-FILE] Options: - --output PATH Output file path (default: /tmp/bulk_cancel_output_.csv) - Use '-' to write to stdout - --commit Apply changes (without this, runs in dry-run mode) - --throttle DURATION Minimum time between requests in ms (default: ${DEFAULT_THROTTLE}) - --help Show this help message + --output PATH Output file path (default: /tmp/bulk_cancel_output_.csv) + Use '-' to write to stdout + --commit Apply changes (without this, runs in dry-run mode) + --concurrency N Number of customers to process concurrently (default: ${DEFAULT_CONCURRENCY}) + --stripe-rate-limit N Requests per second for Stripe (default: ${DEFAULT_STRIPE_RATE_LIMIT}) + --stripe-api-retries N Number of retries on Stripe 429s (default: ${DEFAULT_STRIPE_API_RETRIES}) + --stripe-retry-delay-ms N Delay between Stripe retries in ms (default: ${DEFAULT_STRIPE_RETRY_DELAY_MS}) + --help Show this help message `) } @@ -58,9 +74,17 @@ async function main(trackProgress) { const timestamp = new Date().toISOString().replace(/[:.]/g, '-') const outputFile = opts.output ?? `/tmp/bulk_cancel_output_${timestamp}.csv` + // initialize rate limiters + rateLimiters = createRateLimitedApiWrappers({ + stripeRateLimit: opts.stripeRateLimit, + stripeApiRetries: opts.stripeApiRetries, + stripeRetryDelayMs: opts.stripeRetryDelayMs, + }) + await trackProgress('Starting bulk subscription cancellation for Stripe') await trackProgress(`Run mode: ${opts.commit ? 'COMMIT' : 'DRY RUN'}`) - await trackProgress(`Throttle: ${opts.throttle}ms between requests`) + await trackProgress(`Rate limit: Stripe ${opts.stripeRateLimit}/s`) + await trackProgress(`Concurrency: ${opts.concurrency}`) const inputStream = opts.inputFile ? fs.createReadStream(opts.inputFile) @@ -74,62 +98,69 @@ async function main(trackProgress) { let successCount = 0 let errorCount = 0 - let lastLoopTimestamp = 0 - for await (const input of csvReader) { - const timeSinceLastLoop = Date.now() - lastLoopTimestamp - if (timeSinceLastLoop < opts.throttle) { - await setTimeout(opts.throttle - timeSinceLastLoop) - } - lastLoopTimestamp = Date.now() + const queue = new PQueue({ concurrency: opts.concurrency }) + const maxQueueSize = opts.concurrency - processedCount++ + try { + for await (const input of csvReader) { + if (queue.size >= maxQueueSize) { + await queue.onSizeLessThan(maxQueueSize) + } - try { - const result = await processCancellation(input, opts.commit) + queue.add(async () => { + processedCount++ - csvWriter.write({ - stripe_customer_id: input.stripe_customer_id, - target_stripe_account: input.target_stripe_account, - subscription_id: result.subscriptionId || '', - status: result.status, - note: - result.note || (opts.commit ? '' : 'dry run - no changes applied'), + try { + const result = await processCancellation(input, opts.commit) + + csvWriter.write({ + stripe_customer_id: input.stripe_customer_id, + target_stripe_account: input.target_stripe_account, + subscription_id: result.subscriptionId || '', + status: result.status, + note: + result.note || + (opts.commit ? '' : 'dry run - no changes applied'), + }) + + if (result.status === 'cancelled' || result.status === 'validated') { + successCount++ + } else { + errorCount++ + } + + if (processedCount % 10 === 0) { + await trackProgress( + `Processed ${processedCount} customers (${successCount} ${opts.commit ? 'cancelled' : 'validated'}, ${errorCount} errors)` + ) + } + } catch (err) { + errorCount++ + if (err instanceof ReportError) { + csvWriter.write({ + stripe_customer_id: input.stripe_customer_id, + target_stripe_account: input.target_stripe_account, + subscription_id: '', + status: err.status, + note: err.message, + }) + } else { + csvWriter.write({ + stripe_customer_id: input.stripe_customer_id, + target_stripe_account: input.target_stripe_account, + subscription_id: '', + status: 'error', + note: err.message, + }) + await trackProgress( + `Error processing ${input.stripe_customer_id}: ${err.message}` + ) + } + } }) - - if (result.status === 'cancelled' || result.status === 'validated') { - successCount++ - } else { - errorCount++ - } - - if (processedCount % 10 === 0) { - await trackProgress( - `Processed ${processedCount} customers (${successCount} ${opts.commit ? 'cancelled' : 'validated'}, ${errorCount} errors)` - ) - } - } catch (err) { - errorCount++ - if (err instanceof ReportError) { - csvWriter.write({ - stripe_customer_id: input.stripe_customer_id, - target_stripe_account: input.target_stripe_account, - subscription_id: '', - status: err.status, - note: err.message, - }) - } else { - csvWriter.write({ - stripe_customer_id: input.stripe_customer_id, - target_stripe_account: input.target_stripe_account, - subscription_id: '', - status: 'error', - note: err.message, - }) - await trackProgress( - `Error processing ${input.stripe_customer_id}: ${err.message}` - ) - } } + } finally { + await queue.onIdle() } await trackProgress(`✅ Total processed: ${processedCount}`) @@ -147,10 +178,20 @@ async function main(trackProgress) { function parseArgs() { const args = minimist(process.argv.slice(2), { - string: ['output', 'throttle'], + string: [ + 'output', + 'concurrency', + 'stripe-rate-limit', + 'stripe-api-retries', + 'stripe-retry-delay-ms', + ], boolean: ['commit', 'help'], default: { - throttle: DEFAULT_THROTTLE.toString(), + commit: false, + concurrency: DEFAULT_CONCURRENCY, + 'stripe-rate-limit': DEFAULT_STRIPE_RATE_LIMIT, + 'stripe-api-retries': DEFAULT_STRIPE_API_RETRIES, + 'stripe-retry-delay-ms': DEFAULT_STRIPE_RETRY_DELAY_MS, }, unknown: arg => { if (arg.startsWith('-')) { @@ -167,19 +208,32 @@ function parseArgs() { process.exit(0) } - const throttle = parseInt(args.throttle, 10) - if (isNaN(throttle) || throttle < 0) { - console.error('Error: --throttle must be a non-negative integer') + const inputFile = args._[0] + const paramsSchema = z.object({ + output: z.string().optional(), + commit: z.boolean(), + concurrency: z.number().int().positive(), + stripeRateLimit: z.number().positive(), + stripeApiRetries: z.number().int().nonnegative(), + stripeRetryDelayMs: z.number().int().nonnegative(), + inputFile: z.string().optional(), + }) + + try { + return paramsSchema.parse({ + output: args.output, + commit: args.commit, + concurrency: Number(args.concurrency), + stripeRateLimit: Number(args['stripe-rate-limit']), + stripeApiRetries: Number(args['stripe-api-retries']), + stripeRetryDelayMs: Number(args['stripe-retry-delay-ms']), + inputFile, + }) + } catch (err) { + console.error('Invalid arguments:', err.message) usage() process.exit(1) } - - return { - output: args.output, - commit: args.commit, - throttle, - inputFile: args._[0], - } } function getCsvReader(inputStream) { @@ -244,7 +298,15 @@ async function processCancellation(input, commit) { // fetch customer with subscriptions let customer try { - customer = await stripeClient.getCustomerById(customerId, ['subscriptions']) + customer = await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => stripeClient.getCustomerById(customerId, ['subscriptions']), + { + operation: 'getCustomerById', + customerId, + region: stripeClient.serviceName, + } + ) } catch (err) { throw new ReportError( 'customer-not-found', @@ -279,7 +341,15 @@ async function processCancellation(input, commit) { // cancel the subscription immediately try { - await stripeClient.terminateSubscription(migrationSubscription.id) + await rateLimiters.requestWithRetries( + stripeClient.serviceName, + () => stripeClient.terminateSubscription(migrationSubscription.id), + { + operation: 'terminateSubscription', + subscriptionId: migrationSubscription.id, + region: stripeClient.serviceName, + } + ) return { status: 'cancelled',