Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 66 additions & 32 deletions apps/provider-workflows/src/activities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ export type ProcessSupplierParams = {
height: number;
}

export type UpsertSupplierStatusResult = {
state: KeyState;
remediationReasons: RemediationHistoryEntryReason[];
prev?: {
state?: KeyState;
remediationReasons?: RemediationHistoryEntryReason[];
};
supplier?: Supplier;
}

export type RemediateSupplierParams = ProcessSupplierParams & {
reasons: RemediationHistoryEntryReason[];
}
Expand Down Expand Up @@ -69,7 +79,7 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
* Upserts the supplier status into the database.
* @param params ProcessSupplierParams
*/
async upsertSupplierStatus(params: ProcessSupplierParams): Promise<boolean> {
async upsertSupplierStatus(params: ProcessSupplierParams): Promise<UpsertSupplierStatusResult> {
log.info('upsertSupplierStatus: Querying for keys, settings, balance and supplier', {params})
const [key, settings, balance, supplier]: [KeyWithGroup, ApplicationSettings, number, Supplier] = await Promise.all([
dal.keys.loadKey(params.address),
Expand All @@ -82,6 +92,9 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
throw new ApplicationFailure('key not found', 'not_found', true)
}

const prevState = key.state
const prevRemediationReasons = (key.remediationHistory ?? []).map((rh) => rh.reason)

const loggerContext = {
key: key.address,
}
Expand Down Expand Up @@ -279,7 +292,33 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
log.debug('Updating supplier', { params, update })
await dal.keys.updateKey(params.address, update, params.height)
log.info('Update Supplier done!', {params})
return true

const currentState = update.state ?? prevState
const currentRemediationReasons = (update.remediationHistory ?? key.remediationHistory ?? []).map((rh) => rh.reason)

const result: UpsertSupplierStatusResult = {
state: currentState,
remediationReasons: currentRemediationReasons,
}

const prevDiff: UpsertSupplierStatusResult['prev'] = {}
if (prevState !== currentState) {
prevDiff.state = prevState
}

const addedReasons = currentRemediationReasons.filter((r) => !prevRemediationReasons.includes(r))
const removedReasons = prevRemediationReasons.filter((r) => !currentRemediationReasons.includes(r))
if (addedReasons.length || removedReasons.length) {
prevDiff.remediationReasons = prevRemediationReasons
}

if (Object.keys(prevDiff).length) {
result.prev = prevDiff
}

result.supplier = supplier

return result
},

/**
Expand All @@ -302,26 +341,17 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)

if (!key) {
log.warn('remediateSupplier: Key not found', {params})
return {
success: false,
message: 'Key not found'
}
throw ApplicationFailure.nonRetryable('Key not found', 'not_found')
}

if (!key.addressGroup) {
log.warn('remediateSupplier: Address Group not found', {params})
return {
success: false,
message: 'Key address group not found'
}
throw ApplicationFailure.nonRetryable('Key address group not found', 'not_found')
}

if (!supplier) {
log.warn('remediateSupplier: Supplier not found', {params})
return {
success: false,
message: 'Supplier not found'
}
throw ApplicationFailure.retryable('Supplier not found', 'not_found')
}

if (key.remediationHistory?.length === 0) {
Expand Down Expand Up @@ -406,11 +436,10 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
params,
error: e,
})
return {
success: false,
message: 'Failed while updating the supplier status.',
keyUpdate: update,
}
throw ApplicationFailure.retryable(
`Failed while updating the supplier status for ${params.address}.`,
'db_update_failed',
)
}

return { success: true, message: 'Supplier already configured; remediation cleared.' }
Expand Down Expand Up @@ -446,10 +475,10 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
supportedServicesCount: supportedServices.length,
addressGroup: key.addressGroup?.id ?? 'unknown',
})
return {
success: false,
message: 'Refusing to stake with empty services config.',
}
throw ApplicationFailure.nonRetryable(
`Refusing to stake with empty services config for ${params.address}.`,
'empty_services',
)
}

log.debug('remediateSupplier: Executing stake transaction', {
Expand Down Expand Up @@ -581,20 +610,25 @@ export const providerActivities = (dal: DAL, pocketRpcClient: PocketBlockchain)
log.debug('remediateSupplier: Update Supplier done!', { params })
} catch (e) {
log.warn('remediateSupplier: Update Supplier failed!', { params, error: e })
return {
success: false,
message: 'Failed while updating the supplier status.',
keyUpdate: update,
stakeTxResult: txResult,
}
throw ApplicationFailure.retryable(
`Failed while updating the supplier status for ${params.address}. Key state: ${update.state}. Stake tx success: ${txResult.success}, message: ${txResult.message}`,
'db_update_failed',
)
}

if (!txResult.success) {
log.warn('remediateSupplier: Stake transaction failed', { params, txResult })
throw ApplicationFailure.nonRetryable(
`Remediation transaction failed for ${params.address}. Key state: ${update.state}. Tx code: ${txResult.code}, message: ${txResult.message}`,
'stake_tx_failed',
)
}

log.info('remediateSupplier: Execution finished', { params })

return {
success: txResult.success,
message: txResult.success ? 'Remediation completed successfully.' : 'Remediation transaction failed.',
stakeTxResult: txResult,
success: true,
message: 'Remediation completed successfully.',
}
}
})
Loading