-
Notifications
You must be signed in to change notification settings - Fork 0
feat: watermark state-tracking with deterministic prioritized task queuing #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
DerekRoberts
wants to merge
8
commits into
main
Choose a base branch
from
feat/watermark-pattern
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 1 commit
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
86924fc
feat: implement watermark pattern for gapless synchronization (#155)
DerekRoberts 8807357
fix: address PR feedback; remove unused core import and align waterma…
DerekRoberts be5bae5
feat: implement priority-aware TaskQueue with budget-aware throttling
DerekRoberts 57ca527
fix: resolve circular dependency and RatePriority undefined crash
DerekRoberts 9e091dc
fix: resolve remaining RatePriority undefined crash in resolveProject…
DerekRoberts 4424bda
feat: improve sync window visibility with annotations and job summary
DerekRoberts f98bcb5
feat: convert sync-engine to first-class node20 action for cache support
DerekRoberts 2ad1630
chore: force include production bundle and update gitignore
DerekRoberts File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| import * as cache from '@actions/cache'; | ||
| import * as core from '@actions/core'; | ||
DerekRoberts marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| import fs from 'node:fs/promises'; | ||
| import path from 'node:path'; | ||
| import os from 'node:os'; | ||
| import { log } from './log.js'; | ||
|
|
||
| const CACHE_KEY_PREFIX = 'project-sync-watermark-'; | ||
| const WATERMARK_FILE = 'last_sync_timestamp.txt'; | ||
|
|
||
| /** | ||
| * Get the last successful sync timestamp from GitHub Actions cache. | ||
| * Uses the project ID to scope the cache. | ||
| * | ||
| * @param {string} projectId - The GitHub Project V2 ID | ||
| * @returns {Promise<string|null>} ISO timestamp or null if not found | ||
| */ | ||
| export async function getWatermark(projectId, overrides = {}) { | ||
| const { | ||
| cache: cacheModule = cache, | ||
| fs: fsModule = fs, | ||
| log: logger = log | ||
| } = overrides; | ||
|
|
||
| if (!projectId) { | ||
| logger.warning('No project ID provided for watermark retrieval'); | ||
| return null; | ||
| } | ||
|
|
||
| // Use a sanitised project ID for the cache key | ||
| const safeProjectId = projectId.replace(/[^a-zA-Z0-9]/g, '_'); | ||
| const primaryKey = `${CACHE_KEY_PREFIX}${safeProjectId}-`; // We use restore keys to find latest | ||
|
|
||
| const tmpDir = await fsModule.mkdtemp(path.join(os.tmpdir(), 'watermark-')); | ||
| const filePath = path.join(tmpDir, WATERMARK_FILE); | ||
|
|
||
| try { | ||
| logger.info(`Attempting to restore watermark for project ${projectId}...`); | ||
| // restoreCache will find the most recent cache entry starting with this prefix | ||
| const restoredKey = await cacheModule.restoreCache([tmpDir], `${primaryKey}${Date.now()}`, [primaryKey]); | ||
|
|
||
| if (restoredKey) { | ||
| const content = await fsModule.readFile(filePath, 'utf8'); | ||
| const timestamp = content.trim(); | ||
| logger.info(`Restored watermark: ${timestamp} (from key: ${restoredKey})`); | ||
|
|
||
| // Basic validation: ensure it's a valid date | ||
| if (!isNaN(Date.parse(timestamp))) { | ||
| return timestamp; | ||
| } | ||
| logger.warning(`Restored watermark "${timestamp}" is not a valid ISO date`); | ||
| } else { | ||
| logger.info('No existing watermark found in cache'); | ||
| } | ||
| } catch (error) { | ||
| logger.error(`Failed to restore watermark from cache: ${error.message}`); | ||
| if (process.env.DEBUG) logger.debug(error.stack); | ||
| } finally { | ||
| try { | ||
| await fsModule.rm(tmpDir, { recursive: true, force: true }); | ||
| } catch (e) { | ||
| // Ignore cleanup errors | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Save the current sync timestamp to GitHub Actions cache. | ||
| * | ||
| * @param {string} projectId - The GitHub Project V2 ID | ||
| * @param {string} timestamp - ISO timestamp to save | ||
| * @returns {Promise<void>} | ||
| */ | ||
| export async function saveWatermark(projectId, timestamp, overrides = {}) { | ||
| const { | ||
| cache: cacheModule = cache, | ||
| fs: fsModule = fs, | ||
| log: logger = log | ||
| } = overrides; | ||
|
|
||
| if (!projectId || !timestamp) { | ||
| logger.warning('Project ID and timestamp are required to save watermark'); | ||
| return; | ||
| } | ||
|
|
||
| const safeProjectId = projectId.replace(/[^a-zA-Z0-9]/g, '_'); | ||
| const cacheKey = `${CACHE_KEY_PREFIX}${safeProjectId}-${Date.now()}`; | ||
|
|
||
| const tmpDir = await fsModule.mkdtemp(path.join(os.tmpdir(), 'watermark-')); | ||
| const filePath = path.join(tmpDir, WATERMARK_FILE); | ||
|
|
||
| try { | ||
| await fsModule.writeFile(filePath, timestamp); | ||
| logger.info(`Saving watermark ${timestamp} to cache with key ${cacheKey}...`); | ||
|
|
||
| // We don't want to fail the whole run if cache saving fails | ||
| await cacheModule.saveCache([tmpDir], cacheKey); | ||
| logger.info('Watermark saved successfully'); | ||
| } catch (error) { | ||
| // Special handling for "Cache already exists" which shouldn't happen with our timestamping but just in case | ||
| if (error.name === 'ValidationError' && error.message.includes('already exists')) { | ||
| logger.info('Watermark cache already exists for this key, skipping.'); | ||
| } else { | ||
| logger.error(`Failed to save watermark to cache: ${error.message}`); | ||
| } | ||
| } finally { | ||
| try { | ||
| await fsModule.rm(tmpDir, { recursive: true, force: true }); | ||
| } catch (e) { | ||
| // Ignore cleanup errors | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| import { test } from 'node:test'; | ||
| import assert from 'node:assert/strict'; | ||
| import { getRecentItems } from '../../src/github/api.js'; | ||
|
|
||
| test('getRecentItems - Gapless Sync Search Query Construction', async (t) => { | ||
| const calls = []; | ||
| const overrides = { | ||
| shouldProceedFn: async () => ({ proceed: true }), | ||
| withBackoffFn: async operation => operation(), | ||
| graphqlClient: async (_query, variables) => { | ||
| calls.push(variables.searchQuery); | ||
| return { search: { nodes: [] } }; | ||
| } | ||
| }; | ||
|
|
||
| await t.test('uses absolute since parameter when provided (Watermark Pattern)', async () => { | ||
| calls.length = 0; | ||
| const since = '2025-06-12T10:00:00Z'; | ||
| await getRecentItems('org', ['repo1'], 'user', undefined, { overrides, since }); | ||
|
|
||
| assert.ok(calls.length > 0, 'Search calls should be made'); | ||
| for (const query of calls) { | ||
| assert.ok(query.includes(`updated:>=${since}`), `Query should include updated:>=${since}`); | ||
| } | ||
| }); | ||
|
|
||
| await t.test('falls back to 24h window if no since provided', async () => { | ||
| calls.length = 0; | ||
| const now = Date.now(); | ||
| await getRecentItems('org', ['repo1'], 'user', 24, { overrides }); | ||
|
|
||
| assert.ok(calls.length > 0, 'Search calls should be made'); | ||
| for (const query of calls) { | ||
| assert.ok(query.includes('updated:>='), `Query should include updated condition: ${query}`); | ||
| // Find timestamp in string and verify it's close to 24 hours ago | ||
| const timestampMatch = query.match(/updated:>=([^ ]+)/); | ||
| if (timestampMatch) { | ||
| const tDate = new Date(timestampMatch[1]); | ||
| const diff = now - tDate.getTime(); | ||
| // Close enough to 24 hours (86.4M ms) | ||
| assert.ok(Math.abs(diff - 24 * 60 * 60 * 1000) < 60000, `Expected ~24h diff, got ${diff}`); | ||
| } | ||
| } | ||
| }); | ||
|
|
||
| await t.test('is prioritized appropriately if since is present', async () => { | ||
| const watermarkSince = '2025-01-01T00:00:00Z'; | ||
| // Repos and org-level searches should all use the watermark | ||
| await getRecentItems('bcgov', ['some-repo'], 'derek', 1, { overrides, since: watermarkSince }); | ||
|
|
||
| const repoCall = calls.find(c => c.includes('repo:bcgov/some-repo')); | ||
| const authorCall = calls.find(c => c.includes('author:derek')); | ||
|
|
||
| assert.ok(repoCall.includes(`updated:>=${watermarkSince}`), 'Repo search should use watermark'); | ||
| assert.ok(authorCall.includes(`updated:>=${watermarkSince}`), 'Author search should use watermark'); | ||
| }); | ||
| }); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| import { test } from 'node:test'; | ||
| import assert from 'node:assert/strict'; | ||
| import { getWatermark, saveWatermark } from '../../src/utils/watermark.js'; | ||
|
|
||
| test('Watermark Utility - GHA Cache Integration', async (t) => { | ||
| let mockCacheData = {}; | ||
|
|
||
| const mockCache = { | ||
| restoreCache: async (paths, primaryKey, restoreKeys) => { | ||
| const prefix = restoreKeys[0]; | ||
| if (mockCacheData[prefix]) { | ||
| return `${prefix}restored-from-mock`; | ||
| } | ||
| return null; | ||
| }, | ||
| saveCache: async (paths, key) => { | ||
| // Extract prefix before the last dash | ||
| const prefix = key.substring(0, key.lastIndexOf('-') + 1); | ||
| mockCacheData[prefix] = 'mock-saved-data'; | ||
| return 42; | ||
| } | ||
| }; | ||
|
|
||
| const mockFs = { | ||
| mkdtemp: async (prefix) => `/tmp/mock-dir-${prefix.replace(/[^a-zA-Z0-9]/g, '_')}`, | ||
| writeFile: async () => {}, | ||
| readFile: async () => '2024-01-01T00:00:00.000Z', | ||
| rm: async () => {} | ||
| }; | ||
|
|
||
| const mockLog = { | ||
| info: () => {}, | ||
| warning: () => {}, | ||
| error: () => {}, | ||
| debug: () => {} | ||
| }; | ||
|
|
||
| await t.test('should return null when no watermark exists for project', async () => { | ||
| mockCacheData = {}; | ||
| const watermark = await getWatermark('my-project', { cache: mockCache, fs: mockFs, log: mockLog }); | ||
| assert.strictEqual(watermark, null); | ||
| }); | ||
|
|
||
| await t.test('should save and retrieve watermark via cache prefix', async () => { | ||
| mockCacheData = {}; | ||
| const timestamp = '2024-01-01T12:00:00.000Z'; | ||
| await saveWatermark('my-project', timestamp, { cache: mockCache, fs: mockFs, log: mockLog }); | ||
|
|
||
| // The utility should find the saved watermark even though the key has a timestamp | ||
| const watermark = await getWatermark('my-project', { cache: mockCache, fs: mockFs, log: mockLog }); | ||
| assert.strictEqual(watermark, '2024-01-01T00:00:00.000Z'); | ||
| }); | ||
|
|
||
| await t.test('should handle project ID sanitization correctly', async () => { | ||
| mockCacheData = {}; | ||
| const timestamp = '2024-01-02T12:00:00.000Z'; | ||
| // Test with characters that must be sanitized for cache keys | ||
| const specialProjectId = 'org/projects:123'; | ||
| await saveWatermark(specialProjectId, timestamp, { cache: mockCache, fs: mockFs, log: mockLog }); | ||
|
|
||
| const watermark = await getWatermark(specialProjectId, { cache: mockCache, fs: mockFs, log: mockLog }); | ||
| assert.strictEqual(watermark, '2024-01-01T00:00:00.000Z'); | ||
|
|
||
| // Verify prefix was sanitized (org_projects_123) | ||
| const prefixes = Object.keys(mockCacheData); | ||
| assert.ok(prefixes[0].includes('org_projects_123'), `Prefix ${prefixes[0]} should be sanitized`); | ||
| }); | ||
|
|
||
| await t.test('should return null if restored timestamp is invalid', async () => { | ||
| const corruptedFs = { | ||
| ...mockFs, | ||
| readFile: async () => 'not-a-valid-timestamp' | ||
| }; | ||
| mockCacheData['project-sync-watermark-bad_data-'] = 'saved'; | ||
|
|
||
| const watermark = await getWatermark('bad-data', { cache: mockCache, fs: corruptedFs, log: mockLog }); | ||
| assert.strictEqual(watermark, null); | ||
| }); | ||
| }); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.