-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThinkingInLangGraph.py
More file actions
381 lines (285 loc) · 11.2 KB
/
Copy pathThinkingInLangGraph.py
File metadata and controls
381 lines (285 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
# 1.Read incoming customer emails
# 2.Classify them by urgency and topic
# 3.Search relevant documentation to answer question
# 4.Draft appropriate responses
# 5.Escalate complex issues to human agents
# 6.Schedule follow-ups when needed
import sys
from pathlib import Path
# 1.设计state
from typing import TypedDict, Literal
from langgraph.graph import MessagesState, START, END, StateGraph
from langgraph.types import Command, interrupt, RetryPolicy
from pydantic import BaseModel, Field
from QQEmailListener import QQEmailListener
import smtplib
from email.mime.text import MIMEText
from email.header import Header
import httpx
from langgraph.runtime import get_runtime
# 添加BeautifulSoup和re导入
from bs4 import BeautifulSoup
import re
import os
from feishu_api.feishu import FeishuAPI
import logging
# === 关键:正确获取项目根目录 ===
if getattr(sys, 'frozen', False):
# PyInstaller 打包后:可执行文件所在目录 = 项目根目录
PROJECT_ROOT = Path(sys.executable).parent
else:
# 开发模式:脚本所在目录
PROJECT_ROOT = Path(__file__).parent
# 强制加载 PROJECT_ROOT/.env
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env", override=True)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
class EmailClassification(TypedDict):
intent: Literal['question', 'bug', 'building', 'feature', 'complex_request']
urgency: Literal['low', 'medium', 'high', 'critical']
terminal: Literal['Web', 'Windows', 'Android', 'Mac', 'iOS', 'Not provided']
topic: str
summary: str
class EmailAgentState(MessagesState):
email_content: str
sender_email: str
email_id: str
classification: EmailClassification | None
handle_results: list[str] | None
customer_history: dict | None
draft_response: str | None
class QQEmail(TypedDict):
sender: str
password: str
# receiver:str
class SearchAPIError(Exception):
"""搜索API相关异常"""
pass
feishu = FeishuAPI(app_id=os.getenv('APP_ID'), app_secret=os.getenv('APP_SECRET'), app_token=os.getenv('APP_TOKEN'))
# 添加清理HTML内容的函数
def clean_html_content(html_content) -> str:
"""
使用BeautifulSoup清除HTML标签,并结合正则表达式提取正文内容
"""
# 确保输入是字符串类型
if not isinstance(html_content, str):
html_content = str(html_content)
# 如果内容为空,直接返回
if not html_content.strip():
return ""
try:
# 使用BeautifulSoup解析HTML并提取文本
soup = BeautifulSoup(html_content, 'html.parser')
# 移除script和style标签及其内容
for script in soup(["script", "style"]):
script.decompose()
# 获取纯文本
text = soup.get_text()
# 使用正则表达式进一步清理文本
# 将多个连续的空白字符替换为单个空格
text = re.sub(r'\s+', ' ', text)
# 去除首尾空白字符
text = text.strip()
return text
except Exception as e:
# 如果解析失败,返回原始内容(清理后的)
print(f"HTML解析失败: {e}")
# 简单地移除HTML标签
clean_text = re.sub(r'<[^>]+>', '', html_content)
clean_text = re.sub(r'\s+', ' ', clean_text)
return clean_text.strip()
# 2.build node
from langchain_openai import ChatOpenAI
import os
model = os.getenv('MODEL')
base_url = os.getenv('BASE_URL')
api_key = os.getenv('API_KEY')
receive_id=os.getenv('RECEIVE_ID')
table_id=os.getenv('TABLE_ID')
model = ChatOpenAI(
model=model,
base_url=base_url,
api_key=api_key
)
classification_model = model.with_structured_output(EmailClassification)
def classify_intent(state: EmailAgentState) -> Command[
Literal["search_documentation", "to_human", "draft_response", "bug_tracking"]]:
print('开始分类邮件')
# 清理邮件内容中的HTML标签
cleaned_email_content = clean_html_content(state['email_content'])
classification_prompt = f"""
Analyze this customer email and classify it:
Email:{cleaned_email_content}
From:{state['sender_email']}
Provide classification including intent,urgency,topic,and summary
and return json format
intent:Literal['question','bug','building','feature','complex_request']
urgency:Literal['low','medium','high','critical']
terminal:Literal['Web','Windows','Android','Mac','iOS','Not provided']
"""
classification = classification_model.invoke(classification_prompt)
print(f'分类完成:{classification}')
intent = classification['intent']
urgency = classification['urgency']
if intent not in ['question', 'bug', 'building', 'feature', 'complex_request']:
raise ValueError
if urgency not in ['low', 'medium', 'high', 'critical']:
raise ValueError
if classification['terminal'] not in ['Web', 'Windows', 'Android', 'Mac', 'iOS', 'Not provided']:
raise ValueError
if intent in ['billing', 'complex_request'] or urgency in ['critical', 'high']:
goto = 'to_human'
elif intent in ['question', 'feature']:
goto = 'search_documentation'
elif intent == 'bug':
goto = 'bug_tracking'
else:
goto = 'draft_response'
print(f'进入 {goto}')
return Command(
update={
'classification': classification,
'email_content': cleaned_email_content
},
goto=goto
)
def search_documentation(state: EmailAgentState) -> Command['draft_response']:
print('开始搜索文档')
classification = state.get('classification', {})
query = f'{classification.get("intent", "")} {classification.get("topic", "")}'
try:
search_result = [
"The user might be just chatting, let's chat casually. Keep the reply short."
# "Reset password via Settings > Security > Change Password.Only choose this when the user explicitly mentions it.",
# "Password must be at least 12 characters",
"Include uppercase, lowercase, numbers, and symbols"
]
print('搜索完成')
except SearchAPIError:
# 处理搜索API错误
search_result = ["暂时无法获取相关文档,请稍后再试。"]
# 更新状态并返回命令
return Command(
update={'handle_results': search_result},
goto='draft_response'
)
def bug_tracking(state: EmailAgentState) -> Command['draft_response']:
classification = state['classification']
urgency = classification['urgency']
if urgency in ['critical', 'high']:
priority = 'P0'
elif urgency == 'medium':
priority = 'P1'
else:
priority = 'P2'
print('正在提交bug')
bug_description = state['email_content']
submitter = state['sender_email']
terminal = classification['terminal']
feishu.to_feishu(table_id=table_id, data={
"Bug 描述": bug_description,
"提交人": submitter,
"终端": terminal,
"优先级": priority
})
print('Bug 已提交')
return Command(
update={'handle_results': f'Bug ticket created'},
goto='draft_response'
)
def to_human(state: EmailAgentState) -> Command[Literal['send_reply', END]]:
# classification = state.get('classification', {})
# 清理邮件内容中的HTML标签
cleaned_email_content = clean_html_content(state['email_content'])
feishu.send_message(receive_id, cleaned_email_content)
return Command(update={'handle_results': "已经将此紧急问题发送给专业团队,静待即可"}, goto='draft_response')
def draft_response(state: EmailAgentState) -> Command[Literal['send_reply']]:
print('开始拟写回信')
# 清理邮件内容中的HTML标签
cleaned_email_content = clean_html_content(state['email_content'])
classification = state.get('classification', {})
context_sections = []
if state.get('handle_results', None):
formatted_docs = '\n'.join([f'- {doc}' for doc in state["handle_results"]])
context_sections.append(f'Relevant documentation:\n{formatted_docs}')
draft_prompt = f"""
Draft a response to this email:
{cleaned_email_content}
Email intent: {classification.get('intent', 'unknown')}
Urgency level: {classification.get('urgency', 'medium')}
searched_documentation: {context_sections}
Guidelines:
- The user might be just chatting, let's chat casually. Keep the reply short.
- Be professional and helpful
- Address their specific concern
- Use the provided documentation when relevant
"""
# print(draft_prompt)
response = model.invoke(draft_prompt)
print('拟写完成,准备发送')
return Command(
update={'draft_response': response.content},
goto='send_reply'
)
def send_reply(state: EmailAgentState) -> dict:
subject = f"reply about {state['classification'].get('topic', '')}"
body = state['draft_response']
my_email = get_runtime(QQEmail).context['sender']
msg = MIMEText(body, 'plain', 'utf-8')
msg["Subject"] = Header(subject, 'utf-8')
msg['From'] = my_email
msg["To"] = state['sender_email']
smtp_server = 'smtp.qq.com'
smtp_port = 587
sender_email = my_email
password = get_runtime(QQEmail).context['password']
server = None
try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(sender_email, password)
server.sendmail(sender_email, [msg["To"]], msg.as_string())
print('邮件已发送')
except smtplib.SMTPException as e:
print('邮件发送失败:', str(e))
finally:
if server:
server.close()
return {}
workflow = StateGraph(EmailAgentState)
workflow.add_node(classify_intent, retry_policy=RetryPolicy(max_attempts=3))
workflow.add_node(search_documentation, retry_policy=RetryPolicy(max_attempts=3))
workflow.add_node(bug_tracking).add_node(to_human).add_node(draft_response).add_node(send_reply)
workflow.add_edge(START, 'classify_intent')
workflow.add_edge('draft_response', 'send_reply')
workflow.add_edge('send_reply', END)
app = workflow.compile()
def main(test: bool = True):
"""The default is test mode. If you need to launch formally, pass in test=False."""
email_address = os.getenv('QQEMAIL')
password = os.getenv('EMAIL_PASSWORD') # QQ邮箱需要使用授权码而非密码
if not email_address or not password:
print("请设置环境变量 QQEMAIL 和 EMAIL_PASSWORD")
listener = QQEmailListener(email_address, password)
if not test:
while True:
try:
email = next(listener.listen_for_emails(check_interval=30))
if email:
app.invoke({
'email_content': email['email_content'],
'sender_email': email['sender_email'],
'email_id': email['email_id']
},
context=QQEmail(sender=email_address, password=password)
)
except:
continue
if __name__ == '__main__':
main(False)