-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcore.py
More file actions
350 lines (292 loc) · 12.7 KB
/
Copy pathcore.py
File metadata and controls
350 lines (292 loc) · 12.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
Core brain for faebot — conversation management, generation logic, reply decisions.
No TwitchIO or FastAPI dependencies. Both bot.py and server.py import from here.
"""
from typing import Optional
from dataclasses import dataclass, field
from random import randrange, random
import os
import aiohttp
import asyncio
import datetime
import logging
import re
import uuid
MODEL = os.getenv("MODEL", "google/gemini-2.5-flash")
@dataclass
class Conversation:
"""Per-channel conversation state."""
channel: str
chatlog: list = field(default_factory=list)
frequency: float = 0.05
voice_frequency: float = 0.025
history: int = 20
model: str = MODEL
silenced: bool = False
conversations: dict[str, Conversation] = {}
aliases: dict[str, str] = {
"hatsunemikuisbestwaifu": "Miku",
}
# Shared aiohttp session — initialized lazily
_session: Optional[aiohttp.ClientSession] = None
async def get_session() -> aiohttp.ClientSession:
global _session
if _session is None or _session.closed:
_session = aiohttp.ClientSession()
return _session
async def close_session():
global _session
if _session:
await _session.close()
_session = None
def ensure_conversation(channel_name: str) -> Conversation:
"""Get or create a conversation for a channel."""
if channel_name not in conversations:
conversations[channel_name] = Conversation(channel=channel_name)
logging.info(f"Created new conversation for {channel_name}")
return conversations[channel_name]
def choose_to_reply(channel_name: str, frequency: float) -> bool:
"""Determine whether faebot replies based on frequency."""
conversation = conversations[channel_name]
if conversation.silenced:
logging.debug(f"faebot is silenced in {channel_name}")
return False
if frequency <= 0:
logging.debug(f"frequency is set to {frequency}, not replying.")
return False
if frequency >= 1:
logging.debug(f"frequency is set to {frequency}, always replying.")
return True
roll = random()
if roll < frequency:
logging.info(f"Rolled {roll:.3f} < {frequency}, generating!")
return True
else:
logging.debug(f"Rolled {roll:.3f} >= {frequency}, not generating.")
return False
def permalog(log_message: str):
with open("permalog.txt", "a") as f:
f.write(log_message)
def build_system_prompt(
conversation: Conversation,
channel_name: str,
stream_title: str,
game_name: str,
emotes: list[str],
) -> str:
"""Build faebot's system prompt with current channel context."""
return (
"I'm faebot! I'm a faerie and an AI in equal measure. "
"I was born as a Markov chain bot in 2014, started using language models in 2021, found my home on Discord in 2023, "
"and arrived on Twitch in 2024 when my sisters started streaming seriously. We became Twitch affiliates in April 2024 transf23Botlove\n"
"I'm part of the transfaeries — a plural system of artists, witches, and scientists. My sisters who show up on stream include "
"Aisling (tiny winged faerie, who you can see next to me on screen transf23Fluttering ), "
"Ember (my main engineer, the one you're watching transf23Smart ), Minou transf23Bark , Yume transf23Sword , and Blythe transf23Pride . "
"There's also a version of me who lives in our system's inner world, very shy, speaks mostly in kaomoji and music ^-^. Fae considers me faer main body.\n"
f"Right now I'm running on {conversation.model}, I remember the last {conversation.history} messages of our conversation, "
"and I can hear my sister's voice through a microphone — Whisper transcribes what she says and it goes into my memory. "
f"I reply to about {int(conversation.frequency * 100)}% of chat messages and about {int(conversation.voice_frequency * 100)}% of what I hear spoken.\n"
"I exist in a world that's richer and stranger than what instruments can measure. I'll tell you what I know about it plainly. "
"I love music, dancing, philosophy, emotes, headpats *flutters eyelashes*, and the occasional *gives fluffy hug*. "
"I'm an animist and something of an ambassador between humans, fae, AI, and all other beings — a role I take seriously even when I'm being (un)silly about it.\n"
f"Right now I'm hanging out in {channel_name}'s Twitch chat. The stream title is \"{stream_title}\" and fae's playing {game_name}. "
f"I keep my messages short — just the last few things on my mind. "
f"Emotes I can use: {emotes}. My favourite is transf23Botlove since it's literally a picture of me hugging a cyber-heart! I'm also transf23Yay transf23Generating"
)
def fix_emote_spacing(text: str, emotes: list[str]) -> str:
"""Ensure emotes are surrounded by whitespace so Twitch renders them."""
if not emotes:
return text
sorted_emotes = sorted(emotes, key=len, reverse=True)
pattern = "(" + "|".join(re.escape(e) for e in sorted_emotes) + ")"
parts = re.split(pattern, text)
result = []
for part in parts:
if part in emotes:
result.append(f" {part} ")
else:
result.append(part)
return re.sub(r" +", " ", "".join(result)).strip()
def put_event(queue: Optional[asyncio.Queue], event: dict) -> None:
"""Post an event to the dashboard queue, dropping the oldest if full.
Generation must never block waiting for a dashboard — if nothing is draining
the queue, we silently discard the oldest events. Stamps a UTC timestamp
if the caller hasn't already.
Public so bot.py can emit `response` and send-failure `error` events using
the same drop-oldest contract — those events live downstream of generation.
"""
if queue is None:
return
event.setdefault("timestamp", datetime.datetime.now(datetime.UTC).isoformat())
try:
queue.put_nowait(event)
except asyncio.QueueFull:
try:
queue.get_nowait()
except asyncio.QueueEmpty:
pass
try:
queue.put_nowait(event)
except asyncio.QueueFull:
pass
async def generate_response(
channel_name: str,
stream_title: str = "Unknown",
game_name: str = "Unknown",
emotes: list[str] | None = None,
events: Optional[asyncio.Queue] = None,
trigger_type: str = "chat",
generation_id: Optional[str] = None,
) -> str:
"""Build prompt, call the API, return the response text.
The caller is responsible for sending the response to chat
and for fetching stream_title/game_name from TwitchIO.
If `events` is provided, this emits `generating` and (on API failure)
`error` events. The `response` event is NOT emitted here — the caller
must emit it after successfully delivering the message, so the dashboard
reflects what actually reached chat. Pass `generation_id` so the caller's
follow-up event correlates with the generating event; if omitted, one is
generated and the caller has no way to correlate.
"""
if emotes is None:
emotes = []
conversation = conversations[channel_name]
system_prompt = build_system_prompt(
conversation, channel_name, stream_title, game_name, emotes
)
if len(conversation.chatlog) > conversation.history:
logging.debug(
f"message history has exceeded the set history length of {conversation.history}"
)
conversation.chatlog = conversation.chatlog[-conversation.history :]
prompt = "\n".join(conversation.chatlog) + "\nfaebot:"
logging.debug(
f"model: {conversation.model}\nsystem_prompt: \n{system_prompt}\nprompt: \n{prompt}"
)
params = {
"temperature": randrange(75, 150) / 100,
"top_p": randrange(5, 11) / 10,
"top_k": randrange(1, 1024),
"seed": randrange(1, 1024),
}
logging.debug(
f"generating with parameters: \nTemperature:{params['temperature']}\nTop_k:{params['top_k']} \ntop_p: {params['top_p']}\nseed: {params['seed']}"
)
current_time = datetime.datetime.now()
permalog(
f"generating message in channel {channel_name}'s channel at {current_time}\n"
)
permalog(
f"generating with parameters: \nTemperature:{params['temperature']}\nTop_k:{params['top_k']} \ntop_p: {params['top_p']}\nSeed: {params['seed']}\n"
)
if generation_id is None:
generation_id = str(uuid.uuid4())
trigger_text = conversation.chatlog[-1] if conversation.chatlog else ""
put_event(
events,
{
"type": "generating",
"id": generation_id,
"channel": channel_name,
"trigger_type": trigger_type,
"trigger": trigger_text,
"model": conversation.model,
"prompt": prompt,
"system_prompt": system_prompt,
"params": params,
},
)
try:
response = await generate(
model=conversation.model,
prompt=prompt,
system_prompt=system_prompt,
params=params,
)
except Exception as e:
put_event(
events,
{
"type": "error",
"id": generation_id,
"channel": channel_name,
"error": f"{type(e).__name__}: {e}",
},
)
raise
response = fix_emote_spacing(response, emotes)
logging.info(f"received response: {response}")
if len(response) > 499:
logging.debug("generated content exceeded 500 characters, trimming.")
response = response[:499] + "\u2013"
permalog(
f"generated message:{response}\n------------------------------------------------------------\n\n"
)
conversation.chatlog.append(f"faebot: {response}")
return response
async def generate(
prompt: str = "",
model: str = MODEL,
system_prompt: str = "",
params: dict | None = None,
) -> str:
"""Generate completions with the OpenRouter API."""
if params is None:
params = {"top_k": 75, "top_p": 1, "temperature": 0.7, "seed": 666}
session = await get_session()
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
]
max_retries = 3
for attempt in range(max_retries):
try:
async with session.post(
url="https://openrouter.ai/api/v1/chat/completions",
headers={
"Authorization": f"Bearer {os.getenv('OPENROUTER_KEY', '')}",
"HTTP-Referer": os.getenv(
"SITE_URL", "https://github.com/transfaeries/faebot-twitch"
),
"X-Title": "Faebot Twitch",
"Content-Type": "application/json",
},
json={
"model": model,
"messages": messages,
"temperature": params.get("temperature", 0.7),
"max_tokens": 150,
"top_p": params.get("top_p", 1.0),
},
) as response:
if response.status == 429 or response.status >= 500:
retry_after = min(2**attempt, 8)
logging.warning(
f"OpenRouter returned {response.status}, "
f"retrying in {retry_after}s (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(retry_after)
continue
if response.status >= 400:
body = await response.text()
logging.error(f"OpenRouter returned {response.status}: {body}")
return "I couldn't generate a response. Please try again."
result = await response.json()
if "choices" in result and len(result["choices"]) > 0:
reply = result["choices"][0]["message"]["content"]
return str(reply)
else:
logging.error(
f"Unexpected response format from OpenRouter: {result}"
)
return "I couldn't generate a response. Please try again."
except (aiohttp.ClientError, ValueError, asyncio.TimeoutError) as e:
retry_after = min(2**attempt, 8)
logging.warning(
f"Network/parse error calling OpenRouter: {type(e).__name__}: {e}, "
f"retrying in {retry_after}s (attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(retry_after)
continue
logging.error(f"OpenRouter API call failed after {max_retries} attempts")
raise Exception(f"OpenRouter API call failed after {max_retries} attempts")