-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathipython.py
More file actions
323 lines (258 loc) · 9.8 KB
/
ipython.py
File metadata and controls
323 lines (258 loc) · 9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import asyncio
from typing import Dict, Any, Optional
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from jupyter_client.manager import AsyncKernelManager
from ..workspace import get_workspace_dir, WORKSPACE_ROOT
router = APIRouter()
# 单例内核管理器
_kernel_manager: Optional[AsyncKernelManager] = None
class ExecuteCodeRequest(BaseModel):
code: str
timeout: int = 30
silent: bool = False
class ExecuteCodeResponse(BaseModel):
success: bool
execution_count: Optional[int] = None
output: dict = {}
error: Optional[str] = None
class KernelStatusResponse(BaseModel):
status: str
has_kernel: bool
workspace: str
async def get_or_create_kernel() -> AsyncKernelManager:
"""获取或创建单例内核管理器"""
global _kernel_manager
if _kernel_manager is None:
# 确保 workspace 目录存在
workspace_dir = get_workspace_dir()
# 创建新的内核管理器,在启动时设置工作目录
km: AsyncKernelManager = AsyncKernelManager()
# 通过 cwd 参数在启动时设置工作目录
await km.start_kernel(cwd=str(workspace_dir))
_kernel_manager = km
# 执行静态初始化代码(字体配置等)
await _init_kernel_matplotlib(km)
return _kernel_manager
async def ensure_kernel_running(km: AsyncKernelManager):
"""确保内核正在运行"""
if not km.has_kernel or not await km.is_alive():
workspace_dir = get_workspace_dir()
await km.start_kernel(cwd=str(workspace_dir))
await _init_kernel_matplotlib(km)
def _load_env_file() -> dict[str, str]:
"""从 .bay_env.sh 加载环境变量"""
env_file = WORKSPACE_ROOT / ".bay_env.sh"
if not env_file.exists():
return {}
env_vars = {}
try:
with open(env_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line.startswith("export "):
line = line[7:] # 去掉 "export "
if "=" in line:
key, _, value = line.partition("=")
# 去掉单引号和双引号
value = value.strip().strip('"').strip("'")
env_vars[key.strip()] = value
except Exception as e:
print(f"Warning: Failed to load env file: {e}")
return env_vars
# 静态初始化代码(matplotlib 字体配置等,不包含任何动态内容)
# 注意:字体缓存已在 Docker 构建阶段预热,这里不再清理/重建
_KERNEL_INIT_CODE = """
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='matplotlib')
# 使用构建时预热的字体缓存(不再清理和重建)
# 如果缓存不存在,才会自动重建
# 配置中文字体 + Symbola 作为 emoji fallback
# Symbola 是矢量字体,支持任意缩放的 emoji 符号
font_candidates = ['Noto Sans CJK SC', 'Noto Sans CJK JP', 'Noto Sans CJK TC']
# 检查 Symbola 字体是否可用
symbola_available = any('Symbola' in f.name for f in fm.fontManager.ttflist)
if symbola_available:
# 将 Symbola 加入 fallback 列表(用于 emoji)
font_candidates.append('Symbola')
font_candidates.append('DejaVu Sans')
plt.rcParams['font.sans-serif'] = font_candidates
plt.rcParams['axes.unicode_minus'] = False
"""
async def _init_kernel_matplotlib(km: AsyncKernelManager):
"""初始化内核的 matplotlib 配置
执行静态初始化代码来配置中文字体等。
工作目录已在 start_kernel(cwd=...) 时设置。
"""
kc = km.client()
try:
# 组装环境变量注入代码
env_vars = _load_env_file()
env_setup_code = ""
if env_vars:
env_setup_code = "import os\n"
for k, v in env_vars.items():
# 使用 repr 处理转义字符
env_setup_code += f"os.environ[{repr(k)}] = {repr(v)}\n"
# 组合完整初始化代码
full_init_code = env_setup_code + _KERNEL_INIT_CODE
# 执行初始化代码
kc.execute(full_init_code, silent=True, store_history=False)
# 等待执行完成
timeout = 10
while True:
try:
msg = await asyncio.wait_for(kc.get_iopub_msg(), timeout=timeout)
if (
msg["msg_type"] == "status"
and msg["content"].get("execution_state") == "idle"
):
break
except asyncio.TimeoutError:
break
except Exception as e:
print(f"Warning: Failed to initialize matplotlib: {e}")
async def execute_code_in_kernel(
km: AsyncKernelManager, code: str, timeout: int = 30, silent: bool = False
) -> Dict[str, Any]:
"""在内核中执行代码"""
await ensure_kernel_running(km)
kc = km.client()
try:
# 执行代码
kc.execute(code, silent=silent, store_history=not silent)
outputs = {
"text": "",
"images": [],
}
plains = []
execution_count = None
error = None
# 等待执行完成
while True:
try:
msg = await asyncio.wait_for(kc.get_iopub_msg(), timeout=timeout)
msg_type = msg["msg_type"]
content = msg["content"]
if msg_type == "execute_input":
execution_count = content.get("execution_count")
elif msg_type == "execute_result":
data = content.get("data", {})
if isinstance(data, dict):
if "text/plain" in data:
plains.append(data["text/plain"])
if "image/png" in data:
outputs["images"].append({"image/png": data["image/png"]})
elif msg_type == "display_data":
data = content.get("data", {})
if isinstance(data, dict) and "image/png" in data:
outputs["images"].append({"image/png": data["image/png"]})
elif "text/plain" in data:
plains.append(data["text/plain"])
elif msg_type == "stream":
plains.append(content.get("text", ""))
elif msg_type == "error":
error = "\n".join(content.get("traceback", []))
elif msg_type == "status" and content.get("execution_state") == "idle":
# 执行完成
break
except asyncio.TimeoutError:
error = f"Code execution timed out after {timeout} seconds"
break
outputs["text"] = "".join(plains).strip()
return {
"success": error is None,
"execution_count": execution_count,
"output": outputs,
"error": error,
}
except Exception as e:
print(f"Error during code execution: {e}")
return {
"success": False,
"execution_count": None,
"output": {},
"error": f"Execution error: {str(e)}",
}
@router.post("/exec", response_model=ExecuteCodeResponse)
async def execute_code(request: ExecuteCodeRequest):
"""执行 IPython 代码"""
try:
km = await get_or_create_kernel()
result = await execute_code_in_kernel(
km, request.code, timeout=request.timeout, silent=request.silent
)
return ExecuteCodeResponse(
success=result["success"],
execution_count=result["execution_count"],
output=result["output"],
error=result["error"],
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to execute code: {str(e)}")
@router.get("/kernel/status", response_model=KernelStatusResponse)
async def get_kernel_status():
"""获取内核状态"""
try:
global _kernel_manager
if _kernel_manager is None:
return KernelStatusResponse(
status="not_started",
has_kernel=False,
workspace=str(WORKSPACE_ROOT),
)
km = _kernel_manager
status = "unknown"
if km.has_kernel:
if await km.is_alive():
status = "alive"
else:
status = "dead"
return KernelStatusResponse(
status=status,
has_kernel=km.has_kernel,
workspace=str(WORKSPACE_ROOT),
)
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to get kernel status: {str(e)}"
)
@router.post("/kernel/restart")
async def restart_kernel():
"""重启内核"""
try:
global _kernel_manager
if _kernel_manager is not None:
await _kernel_manager.shutdown_kernel()
_kernel_manager = None
# 创建新的内核
await get_or_create_kernel()
return {
"success": True,
"message": "Kernel restarted successfully",
}
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to restart kernel: {str(e)}"
)
@router.delete("/kernel")
async def shutdown_kernel():
"""关闭内核"""
try:
global _kernel_manager
if _kernel_manager is None:
raise HTTPException(status_code=404, detail="Kernel not found")
await _kernel_manager.shutdown_kernel()
_kernel_manager = None
return {
"success": True,
"message": "Kernel shutdown successfully",
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500, detail=f"Failed to shutdown kernel: {str(e)}"
)