Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions config.universaltest-ecr.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"BROKER_CLIENT_CONFIGURATION": {
"common": {
"default": {
"BROKER_SERVER_URL": "https://broker.dev.snyk.io",
"BROKER_HA_MODE_ENABLED": "false",
"BROKER_DISPATCHER_BASE_URL": "https://api.dev.snyk.io"
},
"oauth": {
"clientId": "${CLIENT_ID}",
"clientSecret": "${CLIENT_SECRET}"
}
}
},
"CONNECTIONS": {
"ecr1": {
"type": "ecr",
"identifier": "${BROKER_TOKEN_1}",
"CR_AGENT_URL": "http://localhost:17500",
"BROKER_CLIENT_URL": "http://localhost:7341"
},
"ecr2": {
"type": "ecr",
"identifier": "${BROKER_TOKEN_2}",
"CR_AGENT_URL": "http://localhost:17500",
"BROKER_CLIENT_URL": "http://localhost:7341"
}
}
}

5 changes: 4 additions & 1 deletion lib/broker-workload/clientLocalRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ export class BrokerClientRequestWorkload extends Workload<WorkloadType.localClie
.status(401)
.send({ message: 'blocked', reason, url: this.req.url });
} else {
// Extract connection identifier from the selected websocket connection
const connectionIdentifier =
this.res.locals.websocket?.identifier || null;
hybridClientRequestHandler.makeRequest(
getInterpolatedRequest(
null,
connectionIdentifier,
matchedFilterRule,
this.req,
logContext,
Expand Down
8 changes: 8 additions & 0 deletions lib/broker-workload/websocketRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ export class BrokerWorkload extends Workload<WorkloadType.remoteServer> {
const websocketResponseHandler = websocketHandler;
if (this.options.config.universalBrokerEnabled) {
payload.connectionIdentifier = this.connectionIdentifier;
// note: We duplicate payload.connectionIdentifier in headers because the middleware
// (websocketConnectionSelectorMiddleware) runs on express requests (req, res) and
// doesn't have access to the payload object. The payload only exists in the
// BrokerWorkload context, but the middleware needs the identifier to route CR
// requests to the correct websocket connection. headers are the standard
// way to pass data from a workload to the Express middleware layer.
payload.headers['snyk-broker-connection-identifier'] =
this.connectionIdentifier;
}
const correlationHeaders = getCorrelationDataFromHeaders(payload.headers);
const contextId = payload.headers['x-snyk-broker-context-id'] as
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { log as logger } from '../../../logs/logger';
import { getConfig } from '../../common/config/config';
import { NextFunction, Request, Response } from 'express';
import { WebSocketConnection } from '../types/client';
import { WebSocketConnection, Role } from '../types/client';
import { isWebsocketConnOpen } from '../utils/socketHelpers';
import { maskToken } from '../../common/utils/token';

export const websocketConnectionSelectorMiddleware = (
req: Request,
Expand Down Expand Up @@ -74,17 +75,106 @@ export const websocketConnectionSelectorMiddleware = (
req.path.startsWith('/v1/') ||
req.path.startsWith('/v2/')
) {
const craCompatibleAvailableTypes = getCraCompatibleTypes(config);
if (craCompatibleAvailableTypes.length > 0) {
inboundRequestType = craCompatibleAvailableTypes[0];
const connectionIdentifier = req.headers[
'snyk-broker-connection-identifier'
] as string;

const craCompatibleTypeNames = config.CRA_COMPATIBLE_TYPES as string[];
const craCompatibleConnections = websocketConnections.filter((conn) =>
craCompatibleTypeNames.includes(conn.supportedIntegrationType),
);

// take into account unique connection identifiers
// each connection has primary and secondary roles so we need to count unique identifiers
const uniqueCraCompatibleIdentifiers = new Set(
craCompatibleConnections.map((conn) => conn.identifier),
);

let selectedWebsocketConnection: WebSocketConnection | undefined;

if (connectionIdentifier) {
// Identifier-based routing (for requests from server over websocket)
selectedWebsocketConnection = websocketConnections.find(
(conn) => conn.identifier === connectionIdentifier,
);

if (!selectedWebsocketConnection) {
logger.error(
{
connectionIdentifier: maskToken(connectionIdentifier),
url: req.path,
},
'no websocket connection found for container registry request identifier',
);
res.status(404).send('connection not found for identifier');
return;
}

if (
!craCompatibleTypeNames.includes(
selectedWebsocketConnection.supportedIntegrationType,
)
) {
logger.error(
{
connectionIdentifier: maskToken(connectionIdentifier),
type: selectedWebsocketConnection.supportedIntegrationType,
url: req.path,
},
'connection found but type is not CRA-compatible',
);
res
.status(505)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is this supposed to be a 505? Maybe a 400 would be better?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is an odd return value, but the reason it's here is because that was the previous behavior.

505 is kind of related to this (version not supported), but I agree that a 4xx is probably the better return code. If we're okay changing the status codes that we're returning here then I can change it, but I'm not sure if that falls under the purview of this change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sensei @pavel-snyk what do you think?

.send(
'Connection type not compatible with container registry requests.',
);
return;
}
} else {
res
.status(505)
.send(
'Current Broker Client configuration does not support this flow. Missing container registry agent compatible connection.',
// Fallback to type-based routing when header is missing
// This supports backward compatibility for requests from behind the client
// Only works when there's exactly one unique CRA-compatible connection identifier
if (uniqueCraCompatibleIdentifiers.size === 0) {
logger.error(
{ url: req.path },
'No CRA-compatible connections available for container registry request',
);
return;
res.status(404).send('no CRA-compatible connection available');
return;
}

if (uniqueCraCompatibleIdentifiers.size === 1) {
// Single unique connection identifier: use primary connection (backwards compatibility)
const connectionId = Array.from(uniqueCraCompatibleIdentifiers)[0];
selectedWebsocketConnection =
craCompatibleConnections.find(
(conn) =>
conn.identifier === connectionId && conn.role === Role.primary,
) ||
craCompatibleConnections.find(
(conn) => conn.identifier === connectionId,
);
} else {
// Multiple unique connection identifiers but no identifier header: cannot route
logger.error(
{
url: req.path,
availableConnections: uniqueCraCompatibleIdentifiers.size,
},
'Container registry request missing connection identifier header and multiple CRA-compatible connections exist',
);
res
.status(500)
.send(
'missing connection identifier (multiple CRA-compatible connections)',
);
return;
}
}

res.locals.websocket = selectedWebsocketConnection;
next();
return;
} else {
logger.error(
{ url: req.path },
Expand Down
117 changes: 117 additions & 0 deletions test/functional/multiple-container-registries-same-type.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
const PORT = 9001;
import path from 'path';
import { axiosClient } from '../setup/axios-client';
import {
BrokerClient,
closeBrokerClient,
waitForBrokerServerConnections,
} from '../setup/broker-client';
import {
BrokerServer,
closeBrokerServer,
createBrokerServer,
waitForUniversalBrokerClientsConnection,
} from '../setup/broker-server';
import { TestWebServer, createTestWebServer } from '../setup/test-web-server';
import { DEFAULT_TEST_WEB_SERVER_PORT } from '../setup/constants';
import { createUniversalBrokerClient } from '../setup/broker-universal-client';

const fixtures = path.resolve(__dirname, '..', 'fixtures');
const serverAccept = path.join(fixtures, 'server', 'filters-cra.json');

/**
* Integration test for multiple container registries of the same type.
*
* Note: This test verifies that multiple ECR registries can be configured
* and that identifier-based routing works. The identifier header is set by
* the BrokerWorkload when it receives requests from the server over websocket,
* so this test verifies the end-to-end flow where:
* 1. Server sends request over websocket to client
* 2. BrokerWorkload adds identifier header
* 3. Middleware routes based on identifier
* 4. Request is forwarded to correct downstream registry
*/
describe('Multiple container registries of the same type - identifier-based routing', () => {
let tws: TestWebServer;
let bs: BrokerServer;
let bc: BrokerClient;
process.env.API_BASE_URL = `http://localhost:${DEFAULT_TEST_WEB_SERVER_PORT}`;

beforeAll(async () => {
tws = await createTestWebServer();

bs = await createBrokerServer({ port: PORT, filters: serverAccept });

process.env.SKIP_REMOTE_CONFIG = 'true';
process.env.SNYK_BROKER_SERVER_UNIVERSAL_CONFIG_ENABLED = 'true';
process.env.UNIVERSAL_BROKER_ENABLED = 'true';
process.env.SERVICE_ENV = 'universaltest-ecr';
process.env.CLIENT_ID = 'clientid';
process.env.CLIENT_SECRET = 'clientsecret';
// Create two ECR registries with different tokens
process.env.BROKER_TOKEN_1 = 'ecr-registry-1-token';
process.env.BROKER_TOKEN_2 = 'ecr-registry-2-token';
process.env.SNYK_BROKER_CLIENT_CONFIGURATION__common__default__BROKER_SERVER_URL = `http://localhost:${bs.port}`;

bc = await createUniversalBrokerClient();
await waitForUniversalBrokerClientsConnection(bs, 2);
});

afterAll(async () => {
await tws.server.close();
if (bc) {
await closeBrokerClient(bc);
}
await closeBrokerServer(bs);
delete process.env.BROKER_SERVER_URL;
delete process.env.BROKER_TOKEN_1;
delete process.env.BROKER_TOKEN_2;
delete process.env.CLIENT_ID;
delete process.env.CLIENT_SECRET;
});

it('should have two ECR connections established', async () => {
const serverMetadata = await waitForBrokerServerConnections(bc);
expect(serverMetadata.length).toBeGreaterThanOrEqual(2);
expect(serverMetadata.map((x) => x.brokertoken)).toEqual(
expect.arrayContaining(['ecr-registry-1-token', 'ecr-registry-2-token']),
);
});

it('should successfully broker container registry requests with identifier-based routing', async () => {
const serverMetadata = await waitForBrokerServerConnections(bc);
expect(serverMetadata.length).toBeGreaterThanOrEqual(2);

// Verify both connections are established
const ecr1Metadata = serverMetadata.find(
(x) => x.brokertoken === 'ecr-registry-1-token',
);
const ecr2Metadata = serverMetadata.find(
(x) => x.brokertoken === 'ecr-registry-2-token',
);

expect(ecr1Metadata).toBeDefined();
expect(ecr2Metadata).toBeDefined();
expect(ecr1Metadata!.identifier).toBeDefined();
expect(ecr2Metadata!.identifier).toBeDefined();
// Identifiers should be different
expect(ecr1Metadata!.identifier).not.toEqual(ecr2Metadata!.identifier);

// When server sends requests over websocket, BrokerWorkload will add
// the identifier header, and the middleware will route correctly.
// This test verifies identifier-based routing by simulating a server request
// with the connection identifier header.
const ecr1Identifier = ecr1Metadata!.identifier;
const response = await axiosClient.post(
`http://localhost:${bc.port}/api/v2/import/done`,
{ some: { example: 'json' } },
{
headers: {
'snyk-broker-connection-identifier': ecr1Identifier,
},
},
);

expect(response.status).toEqual(200);
});
});
3 changes: 3 additions & 0 deletions test/setup/broker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ interface ConnectionDetails {
brokertoken: string;
capabilities: Array<string>;
index: number;
identifier: string;
}

export const waitForBrokerServerConnections = async (
Expand All @@ -125,6 +126,7 @@ export const waitForBrokerServerConnections = async (
index: index,
capabilities: x.capabilities,
brokertoken: x.identifier,
identifier: x.identifier,
};
},
);
Expand All @@ -137,6 +139,7 @@ export const waitForBrokerServerConnections = async (
index: index,
capabilities: x.capabilities,
brokertoken: x.identifier,
identifier: x.identifier,
};
});
remainingConnectionsToWaitFor = capabilities
Expand Down
Loading