-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathws_client.py
More file actions
122 lines (120 loc) · 4.6 KB
/
Copy pathws_client.py
File metadata and controls
122 lines (120 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import json
from pprint import pprint
import websockets
from datetime import datetime
import csv
import os.path
#
def to_csv(data, filename='output.csv'):
# Open the CSV file in write mode with 'newline=' for proper CSV formatting
print("to_csv", filename)
#
if not os.path.isfile(filename):
with open(filename, mode='w+', newline='\n') as csvfile:
# Create a DictWriter object
fieldnames = data.keys() if isinstance(data, dict) else data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
#
# Append mode
with open(filename, mode='at', newline='\n') as csvfile:
# Create a DictWriter object
fieldnames = data.keys() if isinstance(data, dict) else data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# Write the data rows
writer.writerow(data)
print(f"Data successfully written to {filename}")
return True
#
async def recv_with_timeout(websocket, timeout):
try:
data = await asyncio.wait_for(websocket.recv(), timeout=timeout)
return data
except asyncio.TimeoutError:
print("Timeout occurred while waiting for data")
except websockets.ConnectionClosed:
print("WebSocket connection closed")
except Exception as e:
print(f"An error occurred: {e}")
return None
#
async def send_ping_with_timeout(websocket, timeout):
try:
await asyncio.wait_for(websocket.send(), timeout=timeout)
return True
except asyncio.TimeoutError:
print("Timeout occurred while waiting for data")
except websockets.ConnectionClosed:
print("WebSocket connection closed")
except Exception as e:
print(f"An error occurred: {e}")
return False
#
async def subscribe(channel, target_currency="BTC", quote_currency="KRW"):
"""https://docs.coinone.co.kr/reference/public-websocket-1"""
uri = "wss://stream.coinone.co.kr"
request = {
"request_type": "SUBSCRIBE",
"channel": channel, # TICKER, TRADE, ORDERBOOK
"topic": {"quote_currency": quote_currency, "target_currency": target_currency},
}
request = json.dumps(request)
async with websockets.connect(uri) as websocket:
datetime_start = datetime.now()
datetime_end = datetime.now()
timeout = 60 # seconds
response = None
print("Connecting to WebSocket...")
#
await websocket.send(request)
#
while True:
datetime_end = datetime.now()
diff_secs = int((datetime_end - datetime_start).total_seconds())
#print("diff_secs:", diff_secs)
#print("datetime_start:", datetime_start)
#print("datetime_end: ", datetime_end)
if diff_secs > 60:
print("PING", diff_secs)
await websocket.send('{"request_type": "PING"}')
#await asyncio.sleep(0.1 - diff_secs)
datetime_start = datetime_end
#
#response = await websocket.recv()
response = await recv_with_timeout(websocket, timeout)
if response is None:
await websocket.send('{"request_type": "PING"}')
print("No response received, sending PING")
# If no response, send PING and continue the loop
await asyncio.sleep(0.1)
continue
#print("No response received, exiting loop")
#break
response_json = json.loads(response)
#pprint(response_json)
if 'data' in response_json and 'last' in response_json['data']:
print(diff_secs, 'last:',json.dumps(response_json['data']['last'], indent=2, ensure_ascii=False))
print(json.dumps(response_json['data'], indent=2, ensure_ascii=False))
to_csv(response_json['data'], filename=f"{channel}_{target_currency}_{quote_currency}.csv")
#to_csv(response_json['data'])
if response_json["response_type"] == "ERROR":
print("error_code", response_json["error_code"])
if response_json["response_type"] == "PONG":
print("PONG received", response_json)
#
#
#
#
async def gather():
await asyncio.gather(
#subscribe("TRADE"),
subscribe("TICKER", target_currency="XLM", quote_currency="KRW"),
#subscribe("ORDERBOOK"),
)
#
if __name__ == '__main__':
asyncio.run(gather())
#
# 출처: https://comdoc.tistory.com/entry/파이썬-코인원-웹-소켓-데모 [ComDoc:티스토리]
#