Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
@taskforcesh:registry=https://npm.taskforce.sh/
//npm.taskforce.sh/:_authToken=${NPM_TASKFORCESH_TOKEN}
42 changes: 1 addition & 41 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"test:unit": "vitest --config ./vitest.config.ts",
"test:integration": "vitest --config ./vitest.config.integration.ts",
"lint": "eslint . --ignore-path ../../.eslintignore && tsc --noEmit",
"db": "DOTENV_CONFIG_PATH=.env knex",
"db": "knex",
"readme:db:migration:create": "HOW TO USE: npm run -w backend db:migration:create your_migration_name",
"db:migration:create": "npm run db -- migrate:make",
"db:list": "npm run db -- migrate:list",
Expand Down Expand Up @@ -45,7 +45,7 @@
"@opengovsg/sgid-client": "2.3.0",
"@opentelemetry/sdk-node": "0.207.0",
"@plumber/types": "file:../types",
"@taskforcesh/bullmq-pro": "7.8.1",
"bullmq": "^5.0.0",
"ai": "5.0.81",
"ajv-formats": "2.1.1",
"async-mutex": "0.4.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { GroupStatus } from '@taskforcesh/bullmq-pro'

import { GroupStatus } from '@/lib/bullmq-pro-compat'
import { actionQueuesByName } from '@/queues/action'

import type { AdminQueryResolvers } from '../../__generated__/types.generated'
Expand Down
10 changes: 5 additions & 5 deletions packages/backend/src/helpers/__tests__/actions.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import type { IActionJobData, IApp, IJSONObject } from '@plumber/types'

import {
type JobPro,
UnrecoverableError,
WorkerPro,
} from '@taskforcesh/bullmq-pro'
import type { AxiosError } from 'axios'
import { type Span } from 'dd-trace'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'

import HttpError from '@/errors/http'
import RetriableError from '@/errors/retriable-error'
import StepError from '@/errors/step'
import {
type JobPro,
UnrecoverableError,
WorkerPro,
} from '@/lib/bullmq-pro-compat'
import Step from '@/models/step'

import { doesActionProcessFiles, handleFailedStepAndThrow } from '../actions'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import type { IActionJobData } from '@plumber/types'

import { type JobPro, WorkerPro } from '@taskforcesh/bullmq-pro'
import { UnrecoverableError } from '@taskforcesh/bullmq-pro'
import { type Span } from 'dd-trace'
import get from 'lodash.get'

import HttpError from '@/errors/http'
import RetriableError from '@/errors/retriable-error'
import StepError from '@/errors/step'
import {
type JobPro,
UnrecoverableError,
WorkerPro,
} from '@/lib/bullmq-pro-compat'
import ExecutionStep from '@/models/execution-step'

import { MAXIMUM_JOB_ATTEMPTS } from '../default-job-configuration'
Expand Down
3 changes: 1 addition & 2 deletions packages/backend/src/helpers/backoff.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { type WorkerProOptions } from '@taskforcesh/bullmq-pro'

import RetriableError, { DEFAULT_DELAY_MS } from '@/errors/retriable-error'
import { type WorkerProOptions } from '@/lib/bullmq-pro-compat'

import logger from './logger'

Expand Down
3 changes: 1 addition & 2 deletions packages/backend/src/helpers/default-job-configuration.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { type JobsProOptions } from '@taskforcesh/bullmq-pro'

import appConfig from '@/config/app'
import { type JobsProOptions } from '@/lib/bullmq-pro-compat'

export const REMOVE_AFTER_30_DAYS = {
age: 30 * 24 * 3600,
Expand Down
10 changes: 9 additions & 1 deletion packages/backend/src/helpers/launch-darkly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { memoize } from 'lodash'

import appConfig from '@/config/app'

const client = init(appConfig.launchDarklySdkKey)
const isConfigured =
appConfig.launchDarklySdkKey && appConfig.launchDarklySdkKey !== '...'

// No top level awaits means that the 1st call _might_ need to wait a bit longer
// for init... :(
const getClient = memoize(async () => {
const client = init(appConfig.launchDarklySdkKey)
await client.waitForInitialization()
return client
})
Expand All @@ -19,6 +21,9 @@ export async function getLdFlagValue<T extends IJSONValue>(
userEmail: string | null,
fallbackValue: T,
): Promise<T> {
if (!isConfigured) {
return fallbackValue
}
const client = await getClient()
// LD's API returns us `any`, but their docs state it's limited to any JSON
// value. So we do a yucky cast.
Expand All @@ -37,6 +42,9 @@ export async function getLdFlagValue<T extends IJSONValue>(
export async function getAllLdFlags(
userEmail: string | null,
): Promise<Record<string, any>> {
if (!isConfigured) {
return {}
}
const client = await getClient()

const allFlags = await client.allFlagsState({
Expand Down
45 changes: 30 additions & 15 deletions packages/backend/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,35 @@ import { NodeSDK } from '@opentelemetry/sdk-node'

import appConfig from '@/config/app'

const langfuseSpanProcessor = new LangfuseSpanProcessor({
publicKey: appConfig.pair.rome.publicKey,
secretKey: appConfig.pair.rome.secretKey,
baseUrl: appConfig.pair.rome.baseUrl,
additionalHeaders: {
'CF-Access-Client-Id': appConfig.pair.rome.cloudflare.zeroTrustClientKey,
'CF-Access-Client-Secret':
appConfig.pair.rome.cloudflare.zeroTrustSecretKey,
},
environment: appConfig.appEnv,
})
const { publicKey, secretKey, baseUrl } = appConfig.pair.rome
const { zeroTrustClientKey, zeroTrustSecretKey } =
appConfig.pair.rome.cloudflare

const sdk = new NodeSDK({
spanProcessors: [langfuseSpanProcessor],
})
const isConfigured =
publicKey &&
secretKey &&
baseUrl &&
zeroTrustClientKey &&
zeroTrustSecretKey &&
![publicKey, secretKey, baseUrl, zeroTrustClientKey, zeroTrustSecretKey].some(
(v) => v === '...',
)

sdk.start()
if (isConfigured) {
const langfuseSpanProcessor = new LangfuseSpanProcessor({
publicKey,
secretKey,
baseUrl,
additionalHeaders: {
'CF-Access-Client-Id': zeroTrustClientKey,
'CF-Access-Client-Secret': zeroTrustSecretKey,
},
environment: appConfig.appEnv,
})

const sdk = new NodeSDK({
spanProcessors: [langfuseSpanProcessor],
})

sdk.start()
}
151 changes: 151 additions & 0 deletions packages/backend/src/lib/bullmq-pro-compat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* BullMQ Pro compatibility shim for local development.
*
* This file re-exports the free `bullmq` package using the same class/type
* names as `@taskforcesh/bullmq-pro`, so that the rest of the backend can
* run without a paid BullMQ Pro licence.
*
* Pro-only features (job groups, per-group rate limiting, group status queries)
* are stubbed out as no-ops. This means:
* - Group concurrency / rate-limit config is silently ignored.
* - `rateLimitGroup()` is a no-op (group-delay RetriableErrors fall through to
* the normal step-retry path instead).
* - `getGroupsByStatus()` always returns an empty array.
*
* This is intentional: local development does not need production-level
* throughput shaping, so losing these features is an acceptable trade-off.
*/

import {
Job,
type JobsOptions,
Queue,
type QueueOptions,
UnrecoverableError,
Worker,
type WorkerOptions,
} from 'bullmq'

// ---------------------------------------------------------------------------
// Pro-specific type extensions
// ---------------------------------------------------------------------------

export interface GroupJobConfig {
id: string
limit?: { max: number; duration: number }
}

export interface GroupWorkerConfig {
concurrency?: number
limit?: { max: number; duration: number }
}

/** Drop-in for `JobsProOptions` — adds the optional `group` field. */
export type JobsProOptions = JobsOptions & {
group?: GroupJobConfig
}

/** Drop-in for `QueueProOptions`. */
export type QueueProOptions = QueueOptions

/** Drop-in for `WorkerProOptions` — adds the optional `group` field. */
export type WorkerProOptions = WorkerOptions & {
group?: GroupWorkerConfig
}

/** Drop-in for `JobPro` — adds Pro-specific fields as optional. */
export type JobPro<T = any, R = any, N extends string = string> = Job<
T,
R,
N
> & {
/** BullMQ Pro tracks attempts differently; undefined in the free version. */
attemptsStarted?: number
opts: Job<T, R, N>['opts'] & {
group?: GroupJobConfig
}
}

// ---------------------------------------------------------------------------
// GroupStatus enum stub
// ---------------------------------------------------------------------------

export enum GroupStatus {
Waiting = 'waiting',
Paused = 'paused',
Running = 'running',
}

// ---------------------------------------------------------------------------
// QueuePro — Queue with stub Pro group methods
// ---------------------------------------------------------------------------

export class QueuePro<
DataType = any,
ResultType = any,
NameType extends string = string,
> extends Queue<DataType, ResultType, NameType> {
/**
* Pro-only: returns groups filtered by status.
* Stub always returns an empty array in local dev.
*/
async getGroupsByStatus(
_status: GroupStatus,
): Promise<Array<{ id: string }>> {
return []
}

/** Pro-only: pauses a job group. No-op in local dev. */
async pauseGroup(_groupId: string): Promise<boolean> {
return true
}

/** Pro-only: resumes a paused job group. No-op in local dev. */
async resumeGroup(_groupId: string): Promise<boolean> {
return true
}

/** Pro-only: sets per-group concurrency. No-op in local dev. */
async setGroupConcurrency(
_groupId: string,
_concurrency: number,
): Promise<void> {
// no-op
}

/** Pro-only: removes per-group concurrency limit. No-op in local dev. */
async deleteGroupConcurrency(_groupId: string): Promise<void> {
// no-op
}

/** Pro-only: returns current group concurrency. Returns undefined in local dev. */
async getGroupConcurrency(_groupId: string): Promise<number | undefined> {
return undefined
}
}

// ---------------------------------------------------------------------------
// WorkerPro — Worker with stub Pro group methods
// ---------------------------------------------------------------------------

export class WorkerPro<
DataType = any,
ResultType = any,
NameType extends string = string,
> extends Worker<DataType, ResultType, NameType> {
/**
* Pro-only: rate-limits a specific job group.
* Stub is a no-op in local dev — the job will be retried via the normal
* step-retry path instead of a group delay.
*/
rateLimitGroup(_job: Job<DataType>, _delayInMs: number): void {
// no-op
}
}

// ---------------------------------------------------------------------------
// Re-exports
// ---------------------------------------------------------------------------

export { UnrecoverableError }
export type { Job as JobProClass }
Loading