Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,17 @@ DELEGATOR_JSON_FILE=k8s/tools/governance/delegators.json
# Inline JSON or file path for provider governance (consumed by middleman app)
PROVIDER_JSON=
PROVIDER_JSON_FILE=k8s/tools/governance/providers.json

# --- Schedule interval overrides (optional) ---
# Override the default Temporal scheduled workflow intervals.
# If unset, bootstrap uses hardcoded defaults.

# Middleman workflows
# SCHEDULE_PROVIDER_STATUS_INTERVAL=1m
# SCHEDULE_EXECUTE_PENDING_TX_INTERVAL=10s
# SCHEDULE_SUPPLIER_STATUS_INTERVAL=2m
# SCHEDULE_IMPORT_SUPPLIER_RECOVERY_INTERVAL=1m

# Provider workflows
# SCHEDULE_SUPPLIER_STATUS_INTERVAL=2m
# SCHEDULE_SUPPLIER_REMEDIATION_INTERVAL=10s
96 changes: 76 additions & 20 deletions apps/middleman-workflows/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,41 @@ enum ScheduledWorkflowType {

const ScheduledWorkflowConfig: Record<
ScheduledWorkflowType,
{ interval: string }
{ interval: string; args: any[]; envVar: string }
> = {
[ScheduledWorkflowType.ProviderStatus]: { interval: "1m" },
[ScheduledWorkflowType.ExecutePendingTransaction]: { interval: "10s" },
[ScheduledWorkflowType.SupplierStatus]: { interval: '2m' },
[ScheduledWorkflowType.ImportSupplierRecovery]: { interval: '1m' },
[ScheduledWorkflowType.ProviderStatus]: {
interval: "1m",
args: [],
envVar: 'SCHEDULE_PROVIDER_STATUS_INTERVAL',
},
[ScheduledWorkflowType.ExecutePendingTransaction]: {
interval: "10s",
args: [],
envVar: 'SCHEDULE_EXECUTE_PENDING_TX_INTERVAL',
},
[ScheduledWorkflowType.SupplierStatus]: {
interval: '2m',
args: [],
envVar: 'SCHEDULE_SUPPLIER_STATUS_INTERVAL',
},
[ScheduledWorkflowType.ImportSupplierRecovery]: {
interval: '1m',
args: [],
envVar: 'SCHEDULE_IMPORT_SUPPLIER_RECOVERY_INTERVAL',
},
};

function parseDurationToMs(duration: string): number {
const match = duration.match(/^(\d+)(ms|s|m|h|d)$/)
if (!match) throw new Error(`Invalid duration: ${duration}`)
const value = match[1]!
const unit = match[2]!
const multipliers: Record<string, number> = {
ms: 1, s: 1000, m: 60000, h: 3600000, d: 86400000
}
return parseInt(value) * multipliers[unit]!
}

async function bootstrapNamespace(client: Client, config: TemporalConfig, logger: Logger) {
const workflowService = client.workflowService;
const { namespace, workflowExecutionRetentionPeriod } = config;
Expand Down Expand Up @@ -58,36 +85,65 @@ async function bootstrapNamespace(client: Client, config: TemporalConfig, logger
}

async function bootstrapScheduledWorkflows(client: Client, config: TemporalConfig, logger: Logger) {
for (const type of Object.values(ScheduledWorkflowType)) {
const workflowType = type;
const { interval } = ScheduledWorkflowConfig[workflowType];
for (const workflowType of Object.values(ScheduledWorkflowType)) {
const wfConfig = ScheduledWorkflowConfig[workflowType];
const interval = process.env[wfConfig.envVar] || wfConfig.interval;
const scheduleId = `${workflowType}-scheduled`;

const handle = client.schedule.getHandle(scheduleId);

try {
await client.connection.workflowService.describeSchedule({
namespace: config.namespace,
scheduleId: `${workflowType}-scheduled`,
});
logger.info({ workflowType }, `Scheduled workflow already exists. Skipping registration...`)
const desc = await handle.describe();

const currentArgs = (desc as any).action.args || [];
const currentIntervalMs = desc.spec.intervals?.[0]?.every;
const desiredIntervalMs = parseDurationToMs(interval);

const argsChanged = JSON.stringify(currentArgs) !== JSON.stringify(wfConfig.args);
const intervalChanged = currentIntervalMs !== desiredIntervalMs;

if (argsChanged || intervalChanged) {
logger.warn(
{ workflowType, argsChanged, intervalChanged, currentIntervalMs, desiredIntervalMs },
'Scheduled workflow config changed. Updating...'
);
await handle.update((prev) => ({
...prev,
action: {
...prev.action,
args: wfConfig.args,
},
spec: {
intervals: [{ every: interval as Duration }],
},
}));
logger.info({ workflowType }, 'Scheduled workflow updated successfully');
} else {
logger.info({ workflowType }, 'Scheduled workflow up to date. Skipping...');
}
} catch (error: unknown) {
// Schedule doesn't exist, create it
try {
logger.warn({ workflowType }, `Scheduled workflow does not exist. Registering...`)
logger.warn({ workflowType }, 'Scheduled workflow does not exist. Registering...');
await client.schedule.create({
action: {
type: "startWorkflow",
workflowType,
taskQueue: config.taskQueue!,
args: wfConfig.args,
},
scheduleId: `${workflowType}-scheduled`,
scheduleId,
spec: {
intervals: [{ every: interval as Duration }],
},
});
} catch (error: any) {
if (error?.code === 6 || error?.message?.match(/already exists/i)) {
logger.info({ workflowType }, 'Scheduled workflow already exists. Skipping registration...')
logger.info({ workflowType, interval }, 'Scheduled workflow created successfully');
} catch (createError: any) {
if (createError?.code === 6 || createError?.message?.match(/already exists/i)) {
logger.info({ workflowType }, 'Scheduled workflow already exists. Skipping registration...');
} else {
logger.error({ error, workflowType }, 'Error scheduling scheduled workflow')
throw error;
logger.error({ error: createError, workflowType }, 'Error scheduling scheduled workflow');
throw createError;
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions apps/middleman-workflows/src/lib/dal/applicationSettings.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,12 @@ export default class ApplicationSettings {
async getFirst() {
return this.dbClient.db.query.applicationSettingsTable.findFirst();
}

async isBootstrapped(): Promise<boolean> {
const settings = await this.dbClient.db
.select({ isBootstrapped: schema.applicationSettingsTable.isBootstrapped })
.from(schema.applicationSettingsTable)
.then(r => (r.length ? r[0] : null))
return settings?.isBootstrapped ?? false
}
}
21 changes: 21 additions & 0 deletions apps/middleman-workflows/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ import { ProviderService } from '@/lib/provider'

const logger = getLogger()

const BOOTSTRAP_POLL_INTERVAL = parseInt(process.env.BOOTSTRAP_POLL_INTERVAL_MS || '5000')

async function waitForAppBootstrap(dal: DAL, logger: Logger) {
logger.info('Waiting for application to be bootstrapped...')

while (true) {
try {
const bootstrapped = await dal.appSettings.isBootstrapped()
if (bootstrapped) {
logger.info('Application is bootstrapped. Proceeding with worker setup.')
return
}
logger.warn('Application is not yet bootstrapped. Retrying...')
} catch (error) {
logger.warn({ error }, 'Failed to check bootstrap status. Retrying...')
}
await new Promise((resolve) => setTimeout(resolve, BOOTSTRAP_POLL_INTERVAL))
}
}

export const registerGracefulShutdown = (
disconnect: () => Promise<void>,
logger: Logger,
Expand Down Expand Up @@ -80,6 +100,7 @@ export async function setupTemporalWorker() {
shutdownGraceTime,
})

await waitForAppBootstrap(dal, logger)
await bootstrap(logger)

registerGracefulShutdown(disconnect, logger, shutdownGraceTime)
Expand Down
46 changes: 31 additions & 15 deletions apps/middleman/src/app/admin/setup/blockchainFrom.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ const FormComponent: React.FC<FormProps> = ({ defaultValues, goNext }) => {
clearTimeout(debounceTimerRef.current);
}

form.clearErrors('rpcUrl');
form.clearErrors('indexerApiUrl');
form.setValue('updatedAtHeight', null);

debounceTimerRef.current = setTimeout(() => {
if (rpcUrl && rpcUrl.trim() !== '') {
retrieveBlockchainParams();
Expand Down Expand Up @@ -200,12 +204,16 @@ const FormComponent: React.FC<FormProps> = ({ defaultValues, goNext }) => {
control={form.control}
render={({ field }) => (
<FormItem>
<FormLabel>Shannon API Url</FormLabel>
<FormLabel>Shannon API URL</FormLabel>
<FormControl>
<Input {...field} disabled={isLoadingBlockchainParams} />
<Input
{...field}
placeholder="https://your-shannon-rpc.example.com"
/>
</FormControl>
<FormDescription>
The RPC will determine the chainID and minimum stake. The chainID can not be changed later.
A public RPC or gateway URL for the Pocket Network. This is used to auto-detect
the network and minimum stake. The network (chain ID) cannot be changed after setup.
</FormDescription>
<FormMessage />
</FormItem>
Expand Down Expand Up @@ -244,18 +252,26 @@ const FormComponent: React.FC<FormProps> = ({ defaultValues, goNext }) => {
<FormField
name="indexerApiUrl"
control={form.control}
render={({ field }) => (
<FormItem>
<FormLabel>Indexer API Url</FormLabel>
<FormControl>
<Input {...field} disabled={!chainId || !rpcUrl || isLoadingBlockchainParams} />
</FormControl>
<FormDescription>
This URL will be used to retrieve rewards calculations
</FormDescription>
<FormMessage />
</FormItem>
)}
render={({ field }) => {
const isDisabled = !chainId || !rpcUrl;
return (
<FormItem>
<FormLabel>Indexer API URL</FormLabel>
<FormControl>
<Input
{...field}
disabled={isDisabled}
placeholder="https://api.poktscan.com"
/>
</FormControl>
<FormDescription>
The GraphQL API URL of the POKTscan indexer for your network. Used to retrieve rewards calculations.
Must match the same network as your RPC.
</FormDescription>
<FormMessage />
</FormItem>
);
}}
/>
</div>
</form>
Expand Down
92 changes: 55 additions & 37 deletions apps/middleman/src/app/admin/setup/providersForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
FormControl,
FormField,
FormItem,
FormLabel,
FormMessage,
} from "@igniter/ui/components/form";
import { Checkbox } from "@igniter/ui/components/checkbox";
Expand Down Expand Up @@ -85,55 +84,74 @@ const ProvidersForm: React.FC<ProvidersFormProps> = ({
setIsLoading(false);
}
})}
className="grid gap-4"
className="flex flex-col gap-4"
>
<FormField
control={form.control}
name="providers"
render={() => (
<FormItem>
{providers.map((item) => (
<FormField
key={item.identity}
control={form.control}
name="providers"
render={({ field }) => {
return (
<FormItem
<div className="rounded-md border">
<table className="w-full">
<thead>
<tr className="border-b text-left text-sm text-muted-foreground">
<th className="p-3 w-10"></th>
<th className="p-3">Name</th>
<th className="p-3">Identity</th>
<th className="p-3">URL</th>
</tr>
</thead>
<tbody>
{providers.map((item) => (
<FormField
key={item.identity}
className="flex flex-row items-start space-x-3 space-y-0"
>
<FormControl>
<Checkbox
checked={field.value?.includes(item.identity)}
onCheckedChange={(checked) => {
return checked
? field.onChange([
...field.value,
item.identity,
])
: field.onChange(
field.value?.filter(
(value) => value !== item.identity
)
);
control={form.control}
name="providers"
render={({ field }) => (
<tr
key={item.identity}
className="border-b last:border-0 hover:bg-muted/50 cursor-pointer"
onClick={() => {
const checked = field.value?.includes(item.identity);
if (checked) {
field.onChange(field.value?.filter((v) => v !== item.identity));
} else {
field.onChange([...field.value, item.identity]);
}
}}
/>
</FormControl>
<FormLabel className="text-sm font-normal">
{item.name}
</FormLabel>
</FormItem>
);
}}
/>
))}
>
<td className="p-3">
<FormControl>
<Checkbox
checked={field.value?.includes(item.identity)}
onCheckedChange={(checked) => {
return checked
? field.onChange([...field.value, item.identity])
: field.onChange(field.value?.filter((v) => v !== item.identity));
}}
/>
</FormControl>
</td>
<td className="p-3 font-medium">{item.name}</td>
<td className="p-3 font-mono text-sm text-muted-foreground">
{item.identity.slice(0, 10)}...{item.identity.slice(-6)}
</td>
<td className="p-3 text-sm text-muted-foreground truncate max-w-[250px]">
{item.url}
</td>
</tr>
)}
/>
))}
</tbody>
</table>
</div>
<FormMessage />
</FormItem>
)}
/>
<div className="flex justify-end gap-4">
<Button onClick={goBack} disabled={isLoading}>
<Button variant="outline" onClick={goBack} disabled={isLoading}>
Back
</Button>
<Button type="submit" disabled={isLoading}>
Expand Down
1 change: 1 addition & 0 deletions apps/middleman/src/app/admin/setup/settingsForm.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ const FormComponent: React.FC<FormProps> = ({ defaultValues, goNext, goBack }) =

<div className="flex justify-end gap-4">
<Button
variant="outline"
onClick={goBack}
>
Back
Expand Down
Loading
Loading