Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ akash query wasm contract-state smart $CONTRACT_ADDRESS '{"get_config":{}}'
| `WALLET_SECRET` | Yes | - | Either `privateKey:<private key in hex format>` or `mnemonic:<12/24 words>` |
| `HERMES_ENDPOINT` | No | `https://hermes.pyth.network` | Pyth Hermes API |
| `PRICE_DEVIATION_TOLERANCE` | No | 0 | absolute or percentage value for price deviations which should be ignored (e.g., `100` or `10%`) |
| `PRICE_FETCHING_METHOD` | No | polling | `polling` or `sse` |
| `UPDATE_INTERVAL_MS` | No | `5000` | Update interval (default 5 sec) |
| `GAS_PRICE` | No | `0.025uakt` | Gas price |
| `DENOM` | No | `uakt` | Token denomination |
Expand Down
7 changes: 7 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"@opentelemetry/resources": "^2.5.0",
"@opentelemetry/sdk-node": "^0.211.0",
"commander": "^12.0.0",
"fetch-event-stream": "^0.1.6",
"zod": "^4.3.6"
},
"devDependencies": {
Expand Down
11 changes: 10 additions & 1 deletion src/cli-commands/command-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import { z } from "zod";
import { HermesClient, type HermesConfig } from "../hermes-client.ts";
import { validateContractAddress, validateWalletSecret } from "../validation.ts";
import type { PriceProducerFactoryOptions } from "../types.ts";
import { pollPriceStream } from "../price-stream/polling-price-stream.ts";
import { pollPriceStream } from "../price-stream/polling-price-stream/polling-price-stream.ts";
import { priceSSEStream } from "../price-stream/price-sse-stream/price-sse-stream.ts";

export interface CommandConfig extends HermesConfig {
createHermesClient: (config: HermesConfig) => Promise<HermesClient>;
Expand Down Expand Up @@ -41,6 +42,7 @@ const configSchema = z.object({
});
}
}).optional(),
PRICE_FETCHING_METHOD: z.enum(["polling", "sse"]).default("polling"),
UPDATE_INTERVAL_MS: z.coerce.number().int().nonnegative().default(5 * 1000), // Default to 5 seconds
HEALTHCHECK_PORT: z.coerce.number().int().min(1).max(65535).default(3000),
GAS_PRICE: z.string().regex(/^(\d+)(\.\d+)?uakt$/, { message: 'GAS_PRICE must be a valid number with unit (e.g., "0.025uakt")' }).default("0.025uakt"),
Expand Down Expand Up @@ -71,6 +73,13 @@ export function parseConfig(config: Record<string, string | undefined>): ParseCo
priceDeviationTolerance: result.data.PRICE_DEVIATION_TOLERANCE,
smartContractConfigCacheTTLMs: result.data.SMART_CONTRACT_CONFIG_CACHE_TTL_MS,
priceProducerFactory(options: PriceProducerFactoryOptions) {
if (result.data.PRICE_FETCHING_METHOD === "sse") {
return priceSSEStream({
...options,
unsafeAllowInsecureEndpoints,
baseUrl: result.data.HERMES_ENDPOINT,
});
}
return pollPriceStream({
...options,
unsafeAllowInsecureEndpoints,
Expand Down
40 changes: 40 additions & 0 deletions src/hermes-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ describe("SEC-08: Sensitive data in config exposure", () => {
contractAddress: "akash1qypqxpq9qcrsszg2pvxq6rs0zqg3yyc5lzv7xu",
priceFeedId: "test-feed-id",
address: expect.stringMatching(/^akash1[0-9a-z]{38}$/),
lastPriceUpdateReceivedAt: undefined,
});
expect(JSON.stringify(status)).not.toContain("abandon");
});
Expand Down Expand Up @@ -786,6 +787,45 @@ describe(HermesClient.name, () => {
}
});

it("sets lastPriceUpdateReceivedAt in ISO-8601 format after receiving a price update", async () => {
const priceUpdate = buildPriceFeed("123.45", -8, 1234567890);
const factory = blockingFactory(priceUpdate);
const { client, stargateClient } = setup({ priceProducerFactory: factory });
const ac = new AbortController();

stargateClient.queryContractSmart
.mockResolvedValueOnce({ price_feed_id: "test-feed-id", update_fee: "1", wormhole_contract: "akash1wormhole", admin: "akash1admin", default_denom: "uakt", default_base_denom: "akt", data_sources: [] })
.mockResolvedValueOnce({ price: "12345", conf: "10", expo: -8, publish_time: 1234567880 });
stargateClient.execute.mockResolvedValueOnce({
transactionHash: "TX_TS",
gasUsed: 500000n,
gasWanted: 600000n,
height: 100,
events: [],
logs: [],
});

const startPromise = client.start({ signal: ac.signal });
await vi.waitFor(async () => {
const status = await client.getStatus();
expect(status.lastPriceUpdateReceivedAt).toBeDefined();
});

const status = await client.getStatus();
expect(status.lastPriceUpdateReceivedAt).toMatch(/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}Z$/);

ac.abort();
await startPromise;
});

it("lastPriceUpdateReceivedAt is undefined before any price update is received", async () => {
const { client } = setup();
await client.initialize();

const status = await client.getStatus();
expect(status.lastPriceUpdateReceivedAt).toBeUndefined();
});

it("processes latest price update from stream when updates arrive faster than consumption", async () => {
const priceUpdate1 = buildPriceFeed("10000", -2, 2000);
const priceUpdate2 = buildPriceFeed("10100", -2, 3000);
Expand Down
15 changes: 15 additions & 0 deletions src/hermes-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ export class HermesClient {
#senderAddress?: string;
readonly #config: Required<Omit<HermesConfig, "fetch" | "logger" | "connectWithSigner">>;
#isRunning = false;
#lastPriceUpdateReceivedAt?: string;
#logger: Exclude<HermesConfig["logger"], undefined>;
#connectWithSigner: typeof SigningCosmWasmClient.connectWithSigner;
#smartContractConfig: {
Expand Down Expand Up @@ -528,6 +529,18 @@ export class HermesClient {
const consumePrices = async () => {
for await (const priceUpdate of priceStream) {
priceUpdates.set(priceUpdate);

const price = priceUpdate.priceData.price;
this.#logger?.log(
`Received price from Hermes: ${price.price} (expo: ${price.expo})`,
);
this.#logger?.log(
` Confidence: ${price.conf}, Publish time: ${price.publish_time}`,
);
this.#logger?.log(
` VAA size: ${priceUpdate.vaa.length} bytes (base64)`,
Comment thread
stalniy marked this conversation as resolved.
);
this.#lastPriceUpdateReceivedAt = new Date().toISOString();
}
controller.abort();
};
Expand Down Expand Up @@ -558,6 +571,7 @@ export class HermesClient {
address?: string;
priceFeedId?: string;
contractAddress: string;
lastPriceUpdateReceivedAt?: string;
}> {
// SEC-08: Only return non-sensitive operational status fields.
// Never include mnemonic, gasPrice, rpcEndpoint, or full config.
Expand All @@ -568,6 +582,7 @@ export class HermesClient {
address: this.#senderAddress,
priceFeedId: smartContractConfig.price_feed_id,
contractAddress: this.#config.contractAddress,
lastPriceUpdateReceivedAt: this.#lastPriceUpdateReceivedAt,
};
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it, vi } from "vitest";
import type { HermesResponse } from "../types.ts";
import type { HermesResponse } from "../../types.ts";
import { pollPriceStream, type PollPriceStreamOptions } from "./polling-price-stream.ts";

describe("pollPriceStream", () => {
Expand Down Expand Up @@ -106,22 +106,6 @@ describe("pollPriceStream", () => {
expect(result.value).toEqual({ priceData: goodData.parsed[0], vaa: goodData.binary.data[0] });
});

it("logs price details on successful fetch", async () => {
const logger = { log: vi.fn(), error: vi.fn(), warn: vi.fn() };
const data = createHermesResponse();
const options = createOptions({
fetch: vi.fn().mockResolvedValueOnce(mockFetchResponse(data)),
logger,
});

const gen = pollPriceStream(options);
await gen.next();

expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Fetched price from Hermes: 1000"));
expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("Confidence: 10"));
expect(logger.log).toHaveBeenCalledWith(expect.stringContaining("VAA size:"));
});

it("polls repeatedly yielding updates", async () => {
const data1 = createHermesResponse();
const data2 = createHermesResponse({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import https from "node:https";
import { performance } from "node:perf_hooks";
import { Readable } from "node:stream";
import { setTimeout as delay } from "node:timers/promises";
import { hermesFetchDuration } from "../metrics.ts";
import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate, PythPriceData } from "../types.ts";
import { validateEndpointUrl } from "../validation.ts";
import { hermesFetchDuration } from "../../metrics.ts";
import type { HermesResponse, PriceProducerFactoryOptions, PriceUpdate } from "../../types.ts";
import { validateEndpointUrl } from "../../validation.ts";
import { parsePriceUpdate } from "../utils.ts";

export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGenerator<PriceUpdate> {
if (!options.priceFeedId) {
Expand Down Expand Up @@ -37,7 +38,6 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe
continue;
} finally {
hermesFetchDuration.record(performance.now() - fetchStart, { status });
console.log(`Fetch from Hermes completed with status ${status} in ${performance.now() - fetchStart} ms`);
}

if (!response.ok) {
Expand All @@ -48,32 +48,22 @@ export async function *pollPriceStream(options: PollPriceStreamOptions): AsyncGe
continue;
}

const data = await response.json() as HermesResponse;

if (!data.parsed || data.parsed.length === 0) {
options.logger?.error("No price data returned from Hermes");
let parsedData: HermesResponse;
try {
parsedData = await response.json() as HermesResponse;
} catch (error) {
options.logger?.error(`Error parsing JSON from Hermes: ${(error as Error).message}`);
continue;
}

if (!data.binary?.data || data.binary.data.length === 0) {
options.logger?.error("No VAA binary data returned from Hermes");
const priceUpdateResult = parsePriceUpdate(parsedData);

if (!priceUpdateResult.ok) {
options.logger?.error(priceUpdateResult.message);
continue;
}

const priceData: PythPriceData = data.parsed[0];
const vaa: string = data.binary.data[0];

options.logger?.log(
`Fetched price from Hermes: ${priceData.price.price} (expo: ${priceData.price.expo})`,
);
options.logger?.log(
` Confidence: ${priceData.price.conf}, Publish time: ${priceData.price.publish_time}`,
);
options.logger?.log(
` VAA size: ${vaa.length} bytes (base64)`,
);

yield { priceData, vaa };
yield priceUpdateResult.value;
if (options.pollingIntervalMs > 0) {
await delay(options.pollingIntervalMs, undefined, { signal: options.signal })
.catch((error) => options.logger?.warn(`Polling delay interrupted: ${(error as Error).message}`));
Expand Down
Loading
Loading