-
Notifications
You must be signed in to change notification settings - Fork 31
Expand file tree
/
Copy pathstream-positions.py
More file actions
88 lines (69 loc) · 2.97 KB
/
stream-positions.py
File metadata and controls
88 lines (69 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#!/usr/bin/python3
###############################################################################
# S T R E A M P O S I T I O N S #
###############################################################################
# How to stream position information from a Data Node using Websockets:
# ----------------------------------------------------------------------
# Pagination is not supported, but the initial snapshot may contain
# multiple pages. Date Range is not supported, this is a realtime stream.
# ----------------------------------------------------------------------
# The stream can be filtered by various parameters, including:
# partyId: Vega party id (public key)
# marketId: Vega market id
# > Include none, one or both to refine the stream of data from Vega
# ----------------------------------------------------------------------
# For full details see the REST Reference API docs at https://docs.vega.xyz
import websocket
import threading
import json
import helpers
# Load Vega node API v2 URL, this is set using 'source vega-config'
# located in the root folder of the sample-api-scripts repository
data_node_url_rest = helpers.get_from_env("DATA_NODE_URL_REST")
# Load Vega market id
market_id = helpers.env_market_id()
assert market_id != ""
# Connect to the data node with a WSS based endpoint, this is not a HTTPS:// url
# Hint: to include/filter data from a party add the param `partyId` and same
# for a market id:
# e.g. ?marketId=xxx&partyId=yyy
url = f"{data_node_url_rest}/stream/positions".replace("https://", "wss://")
res = []
event = threading.Event()
# __stream_positions:
# Request a stream of positions for a market id on a Vega network
def on_message(wsa, line):
# Vega data-node v2 returns the json line by line so we need to wait
# for a full structure before we can parse to valid JSON in python
if line == "{":
del res[:]
res.append(line)
elif line == "}":
res.append(line)
obj = json.loads(''.join(res))
if "snapshot" in obj["result"]:
# An 'initial image' snapshot containing current positions (may be multiple pages)
print("Snapshot found:")
print(obj["result"]["snapshot"]["positions"])
if "updates" in obj["result"]:
# A list of position updates typically from the last block
print("Updates found:")
print(obj["result"]["updates"]["positions"])
else:
res.append(line)
def on_error(wsa, error):
print(error)
def on_close(wsa, close_status_code, close_msg):
print(f"Positions stream closed: {url}")
def on_open(wsa):
print(f"Positions stream open: {url}")
def timeout():
while not event.wait(timeout=30):
ws.close()
exit(1)
thread = threading.Thread(target=timeout)
thread.start()
ws = websocket.WebSocketApp(url, on_message=on_message, on_error=on_error, on_close=on_close)
ws.on_open = on_open
ws.run_forever()
# :stream_positions__