-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathservice.py
More file actions
243 lines (214 loc) · 8.47 KB
/
Copy pathservice.py
File metadata and controls
243 lines (214 loc) · 8.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import json
import logging
import os
import subprocess
import threading
from threading import Thread
log = logging.getLogger('service')
dir_path = os.path.dirname(os.path.realpath(__file__))
user_info_path = os.path.join(dir_path, 'data/users.json')
# 查询用户信息,参数为用户名
def get_user_info(username):
with open(user_info_path, 'r') as f:
users = json.load(f)
if username in users:
data = {
"code": 200,
"msg": "用户信息查询成功",
"username": username,
"password": users[username][0]
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
else:
data = {
"code": 201,
"msg": "用户不存在"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
# 删除用户信息,参数为列表
def delete_user_info(username_list):
with open(user_info_path, 'r') as f:
users = json.load(f)
for username in username_list:
if username in users:
users.pop(username)
else:
log.info(f"{username}->用户不存在")
with open(user_info_path, 'w') as f:
json.dump(users, f, indent=4, separators=(',', ': '), ensure_ascii=False)
data = {
"code": 200,
"msg": "用户信息删除成功"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
# 增加或修改用户信息,参数为json,如果已存在则修改密码,如果不存在,则增加
def add_user_info(user_info):
with open(user_info_path, 'r') as f:
users = json.load(f)
for username in user_info:
if username in users:
users[username][0] = user_info[username]
else:
users[username] = [user_info[username], None]
with open(user_info_path, 'w') as f:
json.dump(users, f, indent=4, separators=(',', ': '), ensure_ascii=False)
data = {
"code": 200,
"msg": "用户信息增加或修改成功"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
# 替换所有用户信息,参数为json
def replace_all_user_info(json_data):
with open(user_info_path, 'w') as f:
json.dump(json_data, f, indent=4, separators=(',', ': '), ensure_ascii=False)
data = {
"code": 200,
"msg": "用户信息替换成功"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
"""
执行shell命令,参数为json
解析json后执行的命令为: python main.py -u -a app_id -U username
如果,传入的app_id的进程没有结束,则不执行重复的app_id的命令,需要写记录执行
进程结束后,从app_id_list中删除app_id
"""
shell_json = {
"username": "wt6do7iu4ff1",
"app_id": "1868140"
}
# 初始的 app_id_list
app_id_list = []
# 待执行的 app_id 列表
pending_app_id_list = []
# 解析 JSON 输入并执行命令
def execute_shell_command(shell_json):
# 创建一个计时器,在超时后检查并删除进程 ID
def timeout_handler():
if app_id in app_id_list:
app_id_list.remove(app_id)
log.info(f'App ID {app_id} is forcefully removed from running list due to timeout.')
# 如果存在待处理的 app_id,则启动一个
if len(pending_app_id_list) > 0:
next_app_id = pending_app_id_list.pop(0)
log.info(f'App ID {next_app_id} is taken from pending list to run.')
shell_json["app_id"] = next_app_id
execute_shell_command(shell_json) # 递归调用处理 next_app_id
# 解析 JSON 字符串
username = shell_json["username"]
app_id = shell_json["app_id"]
# 检测是否有进程在运行
if app_id in app_id_list:
log.info(f'App ID {app_id} is already running. Command will not be executed.')
data = {
"code": 201,
"msg": f"当前{app_id}的进程正在运行,请等待进程结束后再执行命令"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
# 检查 app_id_list 是否有空间,如果没有,则加入 pending_app_id_list
if len(app_id_list) >= 5:
pending_app_id_list.append(app_id)
log.info(f'App ID {app_id} is added to pending list. Waiting for space to run.')
data = {
"code": 202,
"msg": f"系统繁忙,您的{app_id}进程已加入等待队列"
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
else:
# 在 app_id_list 中记录当前 app_id
app_id_list.append(app_id)
log.info(f'App ID {app_id} is added to running list.')
# 启动新线程执行命令
def run_command():
try:
command = f'python main.py -u -a {app_id} -U {username}'
process = subprocess.Popen(command, shell=True) # use shell=True to handle command as string
log.info(f'Executing command: {command}')
process.wait() # 等待命令执行完成
log.info(f'Command execution completed.')
except Exception as e:
print(f'An error occurred: {e}')
finally:
# 进程结束后应该删除 app_id
if app_id in app_id_list:
app_id_list.remove(app_id)
log.info(f'App ID {app_id} is removed from running list.')
# 如果存在待处理的app_id,启动一个
if len(pending_app_id_list) > 0:
next_app_id = pending_app_id_list.pop(0)
log.info(f'App ID {next_app_id} is taken from pending list to run.')
shell_json["app_id"] = next_app_id
execute_shell_command(shell_json) # 递归调用处理 next_app_id
thread = Thread(target=run_command)
thread.start()
# 创建一个计时器,规定超时时间为10分钟
timeout_timer = threading.Timer(600, timeout_handler) # 10 minutes = 600 seconds
timeout_timer.start()
data = {
"code": 200,
"msg": f"{app_id} 进程已启动, Thanks!",
}
return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
# 初始的 app_id_list
# app_id_list = []
#
#
# # 解析 JSON 输入并执行命令
# def execute_shell_command(shell_json):
# # 解析 JSON 字符串
# # shell = json.loads(shell_json)
# username = shell_json["username"]
# app_id = shell_json["app_id"]
#
# # 检测是否有进程在运行
# if app_id in app_id_list:
# log.info(f'App ID {app_id} is already running. Command will not be executed.')
# data = {
# "code": 201,
# "msg": "当前app_id的进程正在运行,请等待进程结束后再执行命令"
# }
# return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
#
# # 在 app_id_list 中记录当前 app_id
# app_id_list.append(app_id)
#
# # 启动新线程执行命令
# def run_command():
# try:
# command = f'python main.py -u -a {app_id} -U {username}'
# process = subprocess.Popen(command, shell=True) # use shell=True to handle command as string
# log.info(f'Executing command: {command}')
# process.wait() # 等待命令执行完成
# log.info(f'Command execution completed.')
# except Exception as e:
# print(f'An error occurred: {e}')
# finally:
# # 进程结束后应该删除 app_id
# if app_id in app_id_list:
# app_id_list.remove(app_id)
#
# thread = Thread(target=run_command)
# thread.start()
#
# data = {
# "code": 200,
# "msg": f"{app_id} 进程已启动,Thanks!"
# }
# return json.dumps(data, indent=4, separators=(',', ': '), ensure_ascii=False)
if __name__ == '__main__':
# post列表格式
username_list = [
"vlcq89982",
"ojzo36629",
"irq85580"
]
user_info = {
"ueru69941": "123456",
"username1": "password1",
"username2": "password2"
}
# 测试运行
# process = execute_shell_command(shell_json=shell_json)
# print(get_user_info('vlcq89982'))
# print(delete_user_info(username_list))
# print(add_user_info(user_info))
# print(replace_all_user_info({"admin": ["1234567", None]}))