From 60bb53bbfa2cd0ecfc96ffbee267a8f4823adb8e Mon Sep 17 00:00:00 2001 From: Simon Gardner Date: Mon, 26 Jan 2026 13:52:52 +0000 Subject: [PATCH] if stripe fields present, warn and log both records. GitOrigin-RevId: 4ae0a6859b01f25bce8391a35b6789ea73ae344c --- .../migrate_recurly_customers_to_stripe.mjs | 477 +++++++++++------- 1 file changed, 295 insertions(+), 182 deletions(-) diff --git a/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs b/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs index c0e5fb6dac..a4a5160188 100644 --- a/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs +++ b/services/web/scripts/recurly/migrate_recurly_customers_to_stripe.mjs @@ -38,6 +38,10 @@ * _stripe.json (dry-run only): Stripe customer update params * Format: Array of { recurly_account_code, target_stripe_account, stripe_customer_id, updateParams } * + * _stripe_existing_fields.json: Stripe customers that already had name/address/business_name set + * (written in both dry-run and commit modes) + * Format: Array of { recurly_account_code, stripe_account, stripe_customer_id, recurly: {...}, stripe: {...} } + * * Resume behavior: * - Records in the success file are SKIPPED (already done) * - Records in the errors file are RE-PROCESSED (will be retried) @@ -407,6 +411,40 @@ function getStripeJsonPath(successPath) { return successPath.replace(/\.csv$/, '_stripe.json') } +/** + * Get the stripe_existing_fields.json file path from the success file path. + */ +function getStripeExistingFieldsJsonPath(successPath) { + return successPath.replace(/\.csv$/, '_stripe_existing_fields.json') +} + +/** + * Stream a JSON array to disk without holding it all in memory. + */ +function createJsonArrayWriter(jsonPath) { + const stream = fs.createWriteStream(jsonPath, { flags: 'w' }) + stream.write('[\n') + let wroteAny = false + + function write(value) { + const serialized = JSON.stringify(value, null, 2) + if (wroteAny) stream.write(',\n') + stream.write(serialized) + wroteAny = true + } + + async function close() { + stream.write('\n]\n') + stream.end() + await new Promise((resolve, reject) => { + stream.on('finish', resolve) + stream.on('error', reject) + }) + } + + return { write, close } +} + // ============================================================================= // RATE LIMITING // ============================================================================= @@ -684,7 +722,12 @@ function extractCompanyName(account, billingInfo) { * @param {boolean} commit - Whether to actually update the customer * @returns {Promise} - Result row for output CSV */ -async function processCustomer(row, rowNumber, commit) { +async function processCustomer( + row, + rowNumber, + commit, + { writeStripeExistingFields } = {} +) { const { recurly_account_code: recurlyAccountCode, target_stripe_account: targetStripeAccount, @@ -951,6 +994,43 @@ async function processCustomer(row, rowNumber, commit) { : {}), } + // If Stripe already has any of the migrated fields set, warn and capture both + // the desired (Recurly-derived) and existing (Stripe) values in a JSON file. + const presentFields = [] + if (existingCustomer?.name) presentFields.push('name') + if (existingCustomer?.business_name) presentFields.push('business_name') + if ( + existingCustomer?.address && + Object.values(existingCustomer.address).some(v => v) + ) { + presentFields.push('address') + } + + if (presentFields.length > 0) { + logWarn('Stripe customer already has name/address/business_name set', { + ...context, + fields: presentFields, + }) + + if (writeStripeExistingFields) { + writeStripeExistingFields({ + recurly_account_code: recurlyAccountCode, + stripe_account: targetStripeAccount, + stripe_customer_id: stripeCustomerId, + recurly: { + name: customerParams.name, + address: customerParams.address, + business_name: customerParams.business_name, + }, + stripe: { + name: existingCustomer.name, + address: existingCustomer.address, + business_name: existingCustomer.business_name, + }, + }) + } + } + logDebug( 'Transformed customer params', { @@ -1076,6 +1156,13 @@ function usage() { ' STRIPE JSON (_stripe.json): Dry-run only - customer params that would be used for update' ) console.error('') + console.error( + ' STRIPE EXISTING FIELDS (_stripe_existing_fields.json): Customers where Stripe already had name/address/business_name set' + ) + console.error( + ' Written in both dry-run and commit modes (for auditing before overwriting fields)' + ) + console.error('') console.error('Resume behavior:') console.error(' - Records in SUCCESS file are SKIPPED (already done)') console.error( @@ -1120,6 +1207,8 @@ async function main(trackProgress) { const errorsOutputPath = getErrorsPath(successOutputPath) const skippedOutputPath = getSkippedPath(successOutputPath) const stripeJsonPath = getStripeJsonPath(successOutputPath) + const stripeExistingFieldsJsonPath = + getStripeExistingFieldsJsonPath(successOutputPath) const mode = commit ? 'COMMIT MODE' : 'DRY RUN MODE' logDebug(`Starting migration in ${mode}`, { @@ -1128,6 +1217,7 @@ async function main(trackProgress) { errorsOutputPath, skippedOutputPath, ...(commit ? {} : { stripeJsonPath }), + stripeExistingFieldsJsonPath, }) await trackProgress(`Starting migration in ${mode}`) @@ -1183,219 +1273,242 @@ async function main(trackProgress) { // For dry-run mode, collect Stripe customer params to write to JSON const stripeCustomerParams = [] - // Statistics - let totalInInput = 0 - let processedThisRun = 0 - let skippedPreviouslyProcessed = 0 - let updatedCount = 0 - let skippedNoStripeIdCount = 0 - let errorCount = 0 - let dryRunCount = 0 + // Records where Stripe already had name/address/business_name set + const stripeExistingFieldsWriter = createJsonArrayWriter( + stripeExistingFieldsJsonPath + ) - // Track errors for final summary (just the account codes, not full results - memory efficient) - const errorAccountCodes = [] + try { + // Statistics + let totalInInput = 0 + let processedThisRun = 0 + let skippedPreviouslyProcessed = 0 + let updatedCount = 0 + let skippedNoStripeIdCount = 0 + let errorCount = 0 + let dryRunCount = 0 - logDebug('Beginning to process input file', { inputPath }) + // Track errors for final summary (just the account codes, not full results - memory efficient) + const errorAccountCodes = [] - // Process input CSV - true streaming (no collecting results in memory) - const inputStream = fs.createReadStream(inputPath) - const parser = csv.parse({ columns: true, trim: true }) + logDebug('Beginning to process input file', { inputPath }) - inputStream.pipe(parser) + // Process input CSV - true streaming (no collecting results in memory) + const inputStream = fs.createReadStream(inputPath) + const parser = csv.parse({ columns: true, trim: true }) - let rowNumber = 0 - for await (const row of parser) { - rowNumber++ - totalInInput++ + inputStream.pipe(parser) - const accountCode = row.recurly_account_code + let rowNumber = 0 + for await (const row of parser) { + rowNumber++ + totalInInput++ - // Check if already successfully processed in a previous run - if (previouslyProcessed.has(accountCode)) { - skippedPreviouslyProcessed++ - logDebug( - 'Skipping previously successful record', - { - rowNumber, - accountCode, - }, - { verboseOnly: true } - ) - continue - } + const accountCode = row.recurly_account_code - // Process this customer - const result = await processCustomer(row, rowNumber, commit) + // Check if already successfully processed in a previous run + if (previouslyProcessed.has(accountCode)) { + skippedPreviouslyProcessed++ + logDebug( + 'Skipping previously successful record', + { + rowNumber, + accountCode, + }, + { verboseOnly: true } + ) + continue + } - processedThisRun++ + // Process this customer + const result = await processCustomer(row, rowNumber, commit, { + writeStripeExistingFields: stripeExistingFieldsWriter.write, + }) - // Write to appropriate output file based on outcome - if (result.outcome === 'error') { - writeError(result) - errorCount++ - errorAccountCodes.push(accountCode) - } else if (result.outcome === 'skipped_no_stripe_id') { - writeSkipped(result) - skippedNoStripeIdCount++ - } else { - writeSuccess(result) - // Update statistics and collect dry-run data - if (result.outcome === 'updated') { - updatedCount++ - } else if (result.outcome === 'dry_run') { - dryRunCount++ - // Collect customer params for stripe.json output - if (result.customerParams) { - stripeCustomerParams.push({ - recurly_account_code: result.recurly_account_code, - target_stripe_account: result.target_stripe_account, - customerParams: result.customerParams, - }) + processedThisRun++ + + // Write to appropriate output file based on outcome + if (result.outcome === 'error') { + writeError(result) + errorCount++ + errorAccountCodes.push(accountCode) + } else if (result.outcome === 'skipped_no_stripe_id') { + writeSkipped(result) + skippedNoStripeIdCount++ + } else { + writeSuccess(result) + // Update statistics and collect dry-run data + if (result.outcome === 'updated') { + updatedCount++ + } else if (result.outcome === 'dry_run') { + dryRunCount++ + // Collect customer params for stripe.json output + if (result.customerParams) { + stripeCustomerParams.push({ + recurly_account_code: result.recurly_account_code, + target_stripe_account: result.target_stripe_account, + customerParams: result.customerParams, + }) + } } } - } - // Progress update every 1000 customers (or 100 in debug mode) - const progressInterval = DEBUG_MODE ? 100 : 1000 - if (processedThisRun % progressInterval === 0) { - const rateLimiterStats = getRateLimiterStats() - const progress = { - rowNumber, - processedThisRun, - updated: updatedCount, - dryRun: dryRunCount, - skippedNoStripeId: skippedNoStripeIdCount, - errors: errorCount, - skippedPrevious: skippedPreviouslyProcessed, - recurlyRate: rateLimiterStats.recurly.currentRate, - stripeRate: rateLimiterStats.stripe.currentRate, + // Progress update every 1000 customers (or 100 in debug mode) + const progressInterval = DEBUG_MODE ? 100 : 1000 + if (processedThisRun % progressInterval === 0) { + const rateLimiterStats = getRateLimiterStats() + const progress = { + rowNumber, + processedThisRun, + updated: updatedCount, + dryRun: dryRunCount, + skippedNoStripeId: skippedNoStripeIdCount, + errors: errorCount, + skippedPrevious: skippedPreviouslyProcessed, + recurlyRate: rateLimiterStats.recurly.currentRate, + stripeRate: rateLimiterStats.stripe.currentRate, + } + logDebug('Progress update', progress) + await trackProgress( + `Progress: row ${rowNumber}, ${processedThisRun} processed this run, ${errorCount} errors` + ) } - logDebug('Progress update', progress) - await trackProgress( - `Progress: row ${rowNumber}, ${processedThisRun} processed this run, ${errorCount} errors` + } + + // Write stripe.json file in dry-run mode + if (!commit && stripeCustomerParams.length > 0) { + await fs.promises.writeFile( + stripeJsonPath, + JSON.stringify(stripeCustomerParams, null, 2) + ) + logDebug( + `Wrote ${stripeCustomerParams.length} customer params to ${stripeJsonPath}` ) } - } - // Close output streams - await closeOutputs() + // Final summary + const totalSuccessful = commit + ? previouslyProcessed.size + updatedCount + : previouslyProcessed.size + const finalRateLimiterStats = getRateLimiterStats() - // Write stripe.json file in dry-run mode - if (!commit && stripeCustomerParams.length > 0) { - await fs.promises.writeFile( - stripeJsonPath, - JSON.stringify(stripeCustomerParams, null, 2) - ) + logDebug('=== FINAL SUMMARY ===') + logDebug(`Input file total rows: ${totalInInput}`) + logDebug(`Previously successful (skipped): ${skippedPreviouslyProcessed}`) + logDebug(`Processed this run: ${processedThisRun}`) logDebug( - `Wrote ${stripeCustomerParams.length} customer params to ${stripeJsonPath}` + ` - ${commit ? 'Updated' : 'Would update'}: ${commit ? updatedCount : dryRunCount}` + ) + logDebug(` - Skipped (no stripe_customer_id): ${skippedNoStripeIdCount}`) + logDebug(` - Errors: ${errorCount}`) + if (commit) { + logDebug(`Total in success file: ${totalSuccessful}`) + } + logDebug(`Total in skipped file: ${skippedNoStripeIdCount}`) + logDebug(`Total in errors file: ${errorCount}`) + logDebug( + `API calls - Recurly: ${finalRateLimiterStats.recurly.totalRequests}, Stripe: ${finalRateLimiterStats.stripe.totalRequests}` ) - } - // Final summary - const totalSuccessful = commit - ? previouslyProcessed.size + updatedCount - : previouslyProcessed.size - const finalRateLimiterStats = getRateLimiterStats() - - logDebug('=== FINAL SUMMARY ===') - logDebug(`Input file total rows: ${totalInInput}`) - logDebug(`Previously successful (skipped): ${skippedPreviouslyProcessed}`) - logDebug(`Processed this run: ${processedThisRun}`) - logDebug( - ` - ${commit ? 'Updated' : 'Would update'}: ${commit ? updatedCount : dryRunCount}` - ) - logDebug(` - Skipped (no stripe_customer_id): ${skippedNoStripeIdCount}`) - logDebug(` - Errors: ${errorCount}`) - if (commit) { - logDebug(`Total in success file: ${totalSuccessful}`) - } - logDebug(`Total in skipped file: ${skippedNoStripeIdCount}`) - logDebug(`Total in errors file: ${errorCount}`) - logDebug( - `API calls - Recurly: ${finalRateLimiterStats.recurly.totalRequests}, Stripe: ${finalRateLimiterStats.stripe.totalRequests}` - ) - - await trackProgress('=== FINAL SUMMARY ===') - await trackProgress(`Input file total rows: ${totalInInput}`) - await trackProgress( - `Previously successful (skipped): ${skippedPreviouslyProcessed}` - ) - await trackProgress(`Processed this run: ${processedThisRun}`) - await trackProgress( - ` - ${commit ? 'Updated' : 'Would update'}: ${commit ? updatedCount : dryRunCount}` - ) - await trackProgress( - ` - Skipped (no stripe_customer_id): ${skippedNoStripeIdCount}` - ) - await trackProgress(` - Errors: ${errorCount}`) - await trackProgress('') - if (commit) { + await trackProgress('=== FINAL SUMMARY ===') + await trackProgress(`Input file total rows: ${totalInInput}`) await trackProgress( - `Success file: ${successOutputPath} (${totalSuccessful} records)` + `Previously successful (skipped): ${skippedPreviouslyProcessed}` ) - } else { + await trackProgress(`Processed this run: ${processedThisRun}`) await trackProgress( - `Success file: ${successOutputPath} (not modified in dry-run mode)` + ` - ${commit ? 'Updated' : 'Would update'}: ${commit ? updatedCount : dryRunCount}` ) - } - await trackProgress( - `Skipped file: ${skippedOutputPath} (${skippedNoStripeIdCount} records)` - ) - await trackProgress( - `Errors file: ${errorsOutputPath} (${errorCount} records)` - ) - - if (!commit && dryRunCount > 0) { + await trackProgress( + ` - Skipped (no stripe_customer_id): ${skippedNoStripeIdCount}` + ) + await trackProgress(` - Errors: ${errorCount}`) await trackProgress('') - await trackProgress( - `Stripe params file: ${stripeJsonPath} (${stripeCustomerParams.length} records)` - ) - await trackProgress( - 'To actually update customers, run the script with --commit flag' - ) - - logDebug('Dry-run params file written', { - stripeJsonPath, - records: stripeCustomerParams.length, - }) - } - - // Log error account codes for easy reference - if (errorCount > 0) { - logWarn(`${errorCount} records failed and are in the errors file.`) - logWarn('Failed account codes:', { - first20: errorAccountCodes.slice(0, 20), - totalErrors: errorAccountCodes.length, - }) - await trackProgress('') - await trackProgress( - `${errorCount} records failed. Re-run the script to retry them.` - ) - await trackProgress( - `Failed accounts (first 20): ${errorAccountCodes.slice(0, 20).join(', ')}` - ) - } - - // Success/warning based on errors - if (errorCount === 0) { - logDebug('Migration completed successfully', { mode }) - await trackProgress(`Migration completed successfully in ${mode}`) - - // If no errors and errors file exists but is empty (just header), note that - if (fs.existsSync(errorsOutputPath)) { + if (commit) { await trackProgress( - `Errors file is empty (header only) - all records processed successfully!` + `Success file: ${successOutputPath} (${totalSuccessful} records)` + ) + } else { + await trackProgress( + `Success file: ${successOutputPath} (not modified in dry-run mode)` ) } - } else { - logWarn('Migration completed with errors', { mode, errorCount }) await trackProgress( - `Migration completed with ${errorCount} errors in ${mode}` + `Skipped file: ${skippedOutputPath} (${skippedNoStripeIdCount} records)` + ) + await trackProgress( + `Errors file: ${errorsOutputPath} (${errorCount} records)` ) - } - // Return exit code based on whether there were errors - return errorCount === 0 ? 0 : 1 + if (!commit && dryRunCount > 0) { + await trackProgress('') + await trackProgress( + `Stripe params file: ${stripeJsonPath} (${stripeCustomerParams.length} records)` + ) + await trackProgress( + 'To actually update customers, run the script with --commit flag' + ) + + logDebug('Dry-run params file written', { + stripeJsonPath, + records: stripeCustomerParams.length, + }) + } + + await trackProgress( + `Stripe existing fields file: ${stripeExistingFieldsJsonPath}` + ) + + // Log error account codes for easy reference + if (errorCount > 0) { + logWarn(`${errorCount} records failed and are in the errors file.`) + logWarn('Failed account codes:', { + first20: errorAccountCodes.slice(0, 20), + totalErrors: errorAccountCodes.length, + }) + await trackProgress('') + await trackProgress( + `${errorCount} records failed. Re-run the script to retry them.` + ) + await trackProgress( + `Failed accounts (first 20): ${errorAccountCodes.slice(0, 20).join(', ')}` + ) + } + + // Success/warning based on errors + if (errorCount === 0) { + logDebug('Migration completed successfully', { mode }) + await trackProgress(`Migration completed successfully in ${mode}`) + + // If no errors and errors file exists but is empty (just header), note that + if (fs.existsSync(errorsOutputPath)) { + await trackProgress( + `Errors file is empty (header only) - all records processed successfully!` + ) + } + } else { + logWarn('Migration completed with errors', { mode, errorCount }) + await trackProgress( + `Migration completed with ${errorCount} errors in ${mode}` + ) + } + + // Return exit code based on whether there were errors + return errorCount === 0 ? 0 : 1 + } finally { + const results = await Promise.allSettled([ + closeOutputs(), + stripeExistingFieldsWriter.close(), + ]) + + for (const result of results) { + if (result.status === 'rejected') { + logWarn('Failed to close output stream', { + error: result.reason?.message || String(result.reason), + }) + } + } + } } // Execute the script using the runner