diff --git a/.gitignore b/.gitignore
index 987f054..1bb5ae1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,7 +5,7 @@ __pycache__/
# C extensions
*.so
-
+.tree*
# Distribution / packaging
.Python
build/
diff --git a/__main__.py b/__main__.py
index 8094fee..09df8e6 100644
--- a/__main__.py
+++ b/__main__.py
@@ -1,37 +1,14 @@
import os
-os.environ['no_proxy'] = '*' # 避免代理网络产生意外污染
+
import gradio as gr
from request_llm.bridge_chatgpt import predict
-from toolbox import format_io, find_free_port, on_file_uploaded, on_report_generated, get_user_upload, get_conf, \
- ArgsGeneralWrapper, DummyWith
-
-
-# 建议您复制一个config_private.py放自己的秘密, 如API和代理网址, 避免不小心传github被别人看到
-proxies, WEB_PORT, LLM_MODEL, CONCURRENT_COUNT, AUTHENTICATION, CHATBOT_HEIGHT, LAYOUT, API_KEY, AVAIL_LLM_MODELS = \
- get_conf('proxies', 'WEB_PORT', 'LLM_MODEL', 'CONCURRENT_COUNT', 'AUTHENTICATION', 'CHATBOT_HEIGHT', 'LAYOUT',
- 'API_KEY', 'AVAIL_LLM_MODELS')
-
-# 如果WEB_PORT是-1, 则随机选取WEB端口
-PORT = find_free_port() if WEB_PORT <= 0 else WEB_PORT
-if not AUTHENTICATION: AUTHENTICATION = None
-
-from check_proxy import get_current_version
-
-initial_prompt = "Serve me as a writing and programming assistant."
-title_html = f"
ChatGPT For Tester {get_current_version()}
"
-description = """代码开源和更新[地址🚀](https://github.com/binary-husky/chatgpt_academic),感谢热情的[开发者们❤️](https://github.com/binary-husky/chatgpt_academic/graphs/contributors)"""
+from toolbox import format_io, find_free_port, on_file_uploaded, on_report_generated, get_user_upload, \
+ get_user_download, get_conf, ArgsGeneralWrapper, DummyWith
# 问询记录, python 版本建议3.9+(越新越好)
import logging
-os.makedirs("gpt_log", exist_ok=True)
-try:
- logging.basicConfig(filename="gpt_log/chat_secrets.log", level=logging.INFO, encoding="utf-8")
-except:
- logging.basicConfig(filename="gpt_log/chat_secrets.log", level=logging.INFO)
-print("所有问询记录将自动保存在本地目录./gpt_log/chat_secrets.log, 请注意自我隐私保护哦!")
-
# 一些普通功能模块
from core_functional import get_core_functions
@@ -53,183 +30,235 @@ set_theme = adjust_theme()
# 代理与自动更新
from check_proxy import check_proxy, auto_update, warm_up_modules
+import func_box
+
+from check_proxy import get_current_version
+
+os.makedirs("gpt_log", exist_ok=True)
+try:
+ logging.basicConfig(filename="gpt_log/chat_secrets.log", level=logging.INFO, encoding="utf-8")
+except:
+ logging.basicConfig(filename="gpt_log/chat_secrets.log", level=logging.INFO)
+print("所有问询记录将自动保存在本地目录./gpt_log/chat_secrets.log, 请注意自我隐私保护哦!")
+
+# 建议您复制一个config_private.py放自己的秘密, 如API和代理网址, 避免不小心传github被别人看到
+proxies, WEB_PORT, LLM_MODEL, CONCURRENT_COUNT, AUTHENTICATION, CHATBOT_HEIGHT, LAYOUT, API_KEY, AVAIL_LLM_MODELS = \
+ get_conf('proxies', 'WEB_PORT', 'LLM_MODEL', 'CONCURRENT_COUNT', 'AUTHENTICATION', 'CHATBOT_HEIGHT', 'LAYOUT',
+ 'API_KEY', 'AVAIL_LLM_MODELS')
+
proxy_info = check_proxy(proxies)
+# 如果WEB_PORT是-1, 则随机选取WEB端口
+PORT = find_free_port() if WEB_PORT <= 0 else WEB_PORT
+if not AUTHENTICATION: AUTHENTICATION = None
+os.environ['no_proxy'] = '*' # 避免代理网络产生意外污染
-gr_L1 = lambda: gr.Row().style()
-gr_L2 = lambda scale: gr.Column(scale=scale)
-if LAYOUT == "TOP-DOWN":
- gr_L1 = lambda: DummyWith()
- gr_L2 = lambda scale: gr.Row()
- CHATBOT_HEIGHT /= 2
-cancel_handles = []
-with gr.Blocks(title="ChatGPT For Tester", theme=set_theme, analytics_enabled=False, css=advanced_css) as demo:
- gr.HTML(title_html)
- cookies = gr.State({'api_key': API_KEY, 'llm_model': LLM_MODEL})
- with gr_L1():
- with gr_L2(scale=2):
- with gr.Box():
- chatbot = gr.Chatbot()
- chatbot.style(height=CHATBOT_HEIGHT)
- history = gr.State([])
- with gr.Row(visible=False):
- assist = None
+class ChatBotFrame:
+
+ def __init__(self):
+ self.cancel_handles = []
+ self.initial_prompt = "Serve me as a writing and programming assistant."
+ self.title_html = f"ChatGPT For Tester {get_current_version()}
"
+ self.description = """代码开源和更新[地址🚀](https://github.com/binary-husky/chatgpt_academic),感谢热情的[开发者们❤️](https://github.com/binary-husky/chatgpt_academic/graphs/contributors)"""
+
+
+class ChatBot(ChatBotFrame):
+
+ def __init__(self):
+ super().__init__()
+ self.__url = f'http://{func_box.ipaddr()}:{PORT}'
+ # self.__gr_url = gr.State(self.__url)
+
+ def draw_title(self):
+ self.title = gr.HTML(self.title_html)
+ self.cookies = gr.State({'api_key': API_KEY, 'llm_model': LLM_MODEL, 'local': self.__url})
+
+ def draw_chatbot(self):
+ with gr.Box():
+ self.chatbot = gr.Chatbot()
+ self.chatbot.style(height=CHATBOT_HEIGHT)
+ self.history = gr.State([])
+ with gr.Row():
+ self.status = gr.Markdown(f"Tip: 按Enter提交, 按Shift+Enter换行。当前模型: {LLM_MODEL} \n {proxy_info}")
+
+ def draw_input_chat(self):
+ with gr.Accordion("输入区", open=True) as self.area_input_primary:
+ with gr.Row():
+ self.txt = gr.Textbox(show_label=False, placeholder="Input question here.").style(container=False)
+ with gr.Row():
+ self.submitBtn = gr.Button("提交", variant="primary")
+ with gr.Row():
+ self.resetBtn = gr.Button("重置", variant="secondary");
+ self.stopBtn = gr.Button("停止", variant="secondary");
+ self.resetBtn.style(size="sm")
+ self.stopBtn.style(size="sm")
+
+ def draw_function_chat(self):
+ with gr.Tab('Function'):
+ with gr.Accordion("基础功能区", open=True) as self.area_basic_fn:
+ gr.Markdown('> 以下功能依赖输入区内容')
with gr.Row():
- status = gr.Markdown(f"Tip: 按Enter提交, 按Shift+Enter换行。当前模型: {LLM_MODEL} \n {proxy_info}")
+ for k in functional:
+ variant = functional[k]["Color"] if "Color" in functional[k] else "secondary"
+ functional[k]["Button"] = gr.Button(k, variant=variant)
- with gr_L2(scale=1):
- with gr.Tab('对话模式'):
- with gr.Accordion("输入区", open=True) as area_input_primary:
- with gr.Row():
- txt = gr.Textbox(show_label=False, placeholder="Input question here.").style(container=False)
- with gr.Row():
- submitBtn = gr.Button("提交", variant="primary")
- with gr.Row():
- resetBtn = gr.Button("重置", variant="secondary");
- resetBtn.style(size="sm")
- stopBtn = gr.Button("停止", variant="secondary");
- stopBtn.style(size="sm")
-
- with gr.Tab('Function'):
- with gr.Accordion("基础功能区", open=True) as area_basic_fn:
- with gr.Row():
- for k in functional:
- variant = functional[k]["Color"] if "Color" in functional[k] else "secondary"
- functional[k]["Button"] = gr.Button(k, variant=variant)
- with gr.Tab('Public'):
- with gr.Box():
- with gr.Accordion("上传本地文件可供高亮函数插件调用",
- open=False) as area_file_up:
- file_upload = gr.Files(label="任何文件, 但推荐上传压缩文件(zip, tar)",
- file_count="multiple")
- file_upload.style()
- with gr.Row():
- upload_history = submitBtn = gr.Button("Get Upload History", variant="primary")
- with gr.Accordion("函数插件区", open=True) as area_crazy_fn:
- with gr.Row():
- gr.Markdown("注意:以下“高亮”标识的函数插件需从输入区读取路径作为参数.")
- with gr.Row():
- for k in crazy_fns:
- if not crazy_fns[k].get("AsButton", True): continue
- variant = crazy_fns[k]["Color"] if "Color" in crazy_fns[k] else "secondary"
- crazy_fns[k]["Button"] = gr.Button(k, variant=variant)
- crazy_fns[k]["Button"].style(size="sm")
- with gr.Row():
- with gr.Accordion("更多函数插件", open=True):
- dropdown_fn_list = [k for k in crazy_fns.keys() if
- not crazy_fns[k].get("AsButton", True)]
- with gr.Column(scale=1):
- dropdown = gr.Dropdown(dropdown_fn_list, value=r"打开插件列表", label="").style(
- container=False)
- with gr.Column(scale=1):
- switchy_bt = gr.Button(r"请先从插件列表中选择", variant="secondary")
-
- with gr.Tab('Setting'):
- with gr.Accordion("展开SysPrompt & 交互界面布局 & Github地址", open=True):
- system_prompt = gr.Textbox(show_label=True, placeholder=f"System Prompt", label="System prompt",
- value=initial_prompt)
- top_p = gr.Slider(minimum=-0, maximum=1.0, value=1.0, step=0.01, interactive=True,
- label="Top-p (nucleus sampling)", )
- temperature = gr.Slider(minimum=-0, maximum=2.0, value=1.0, step=0.01, interactive=True,
- label="Temperature", )
- max_length_sl = gr.Slider(minimum=256, maximum=4096, value=512, step=1, interactive=True,
- label="MaxLength", )
-
- models_box = gr.CheckboxGroup(["input加密"],
- value=["input加密"], label="对话模式")
- md_dropdown = gr.Dropdown(AVAIL_LLM_MODELS, value=LLM_MODEL, label="更换LLM模型/请求源").style(
+ def draw_public_chat(self):
+ with gr.Tab('Public'):
+ with gr.Tab('Public'):
+ with gr.Accordion("上传本地文件可供高亮函数插件调用", open=False) as self.area_file_up:
+ self.file_upload = gr.Files(label="任何文件, 但推荐上传压缩文件(zip, tar)",
+ file_count="multiple")
+ self.file_upload.style()
+ with gr.Row():
+ self.upload_history = gr.Button("Get Upload History", variant="secondary")
+ self.get_download = gr.Button('Get Download Link', variant='stop')
+ with gr.Accordion("函数插件区", open=True) as self.area_crazy_fn:
+ with gr.Row():
+ for k in crazy_fns:
+ if not crazy_fns[k].get("AsButton", True): continue
+ self.variant = crazy_fns[k]["Color"] if "Color" in crazy_fns[k] else "secondary"
+ crazy_fns[k]["Button"] = gr.Button(k, variant=self.variant)
+ crazy_fns[k]["Button"].style(size="sm")
+ with gr.Accordion("更多函数插件", open=True):
+ dropdown_fn_list = [k for k in crazy_fns.keys() if
+ not crazy_fns[k].get("AsButton", True)]
+ with gr.Column(scale=1):
+ self.dropdown = gr.Dropdown(dropdown_fn_list, value=r"打开插件列表", label="").style(
container=False)
+ with gr.Column(scale=1):
+ self.switchy_bt = gr.Button(r"请先从插件列表中选择", variant="secondary")
- gr.Markdown(description)
- with gr.Tab('Auto-GPT'):
- with gr.Row():
- ai_name = gr.Textbox(show_label=False, placeholder="Give AI a name.").style(container=False)
- with gr.Row():
- user_input = gr.Textbox(lines=5, show_label=False, placeholder="Describe your AI's role.").style(
- container=False)
- with gr.Box():
- with gr.Row() as goal_list:
- goal_array = []
- for text in range(4):
- goal_array.append(gr.Textbox(show_label=False, placeholder="Enter up to 1 goals.").style(container=False))
- with gr.Row():
- submit_add = gr.Button("Adding goals", variant="secondary")
- with gr.Row():
- __l = [str(i) for i in range(10, 101, 10)]
- __l.insert(0, '1')
- submit_numer = gr.Dropdown(__l, value='1', interactive=True, label='Number of Next').style(
- container=False)
- with gr.Row():
- submit_next = gr.Button("Next", variant="primary")
- submit_auto = gr.Button("Continuous", variant="secondary")
- submit_stop = gr.Button("Stop", variant="stop")
+ def draw_setting_chat(self):
+ with gr.Tab('Setting'):
+ with gr.Accordion("展开SysPrompt & 交互界面布局 & Github地址", open=True):
+ self.system_prompt = gr.Textbox(show_label=True, placeholder=f"System Prompt", label="System prompt", value=self.initial_prompt)
+ self.top_p = gr.Slider(minimum=-0, maximum=1.0, value=1.0, step=0.01, interactive=True, label="Top-p (nucleus sampling)", )
+ self.temperature = gr.Slider(minimum=-0, maximum=2.0, value=1.0, step=0.01, interactive=True, label="Temperature", )
+ self.max_length_sl = gr.Slider(minimum=256, maximum=4096, value=512, step=1, interactive=True, label="MaxLength", )
+ self.models_box = gr.CheckboxGroup(["input加密"], value=["input加密"], label="对话模式")
+ self.md_dropdown = gr.Dropdown(AVAIL_LLM_MODELS, value=LLM_MODEL, label="更换LLM模型/请求源").style(container=False)
+ gr.Markdown(self.description)
- # 整理反复出现的控件句柄组合,
- # submitBtn.info
- input_combo = [cookies, max_length_sl, md_dropdown, txt, top_p, temperature, chatbot, history, system_prompt,
- models_box]
- output_combo = [cookies, chatbot, history, status]
- predict_args = dict(fn=ArgsGeneralWrapper(predict), inputs=input_combo, outputs=output_combo)
- # 提交按钮、重置按钮
- cancel_handles.append(txt.submit(**predict_args))
- cancel_handles.append(submitBtn.click(**predict_args))
- resetBtn.click(lambda: ([], [], "已重置"), None, [chatbot, history, status])
- # 基础功能区的回调函数注册
- for k in functional:
- click_handle = functional[k]["Button"].click(fn=ArgsGeneralWrapper(predict),
- inputs=[*input_combo, gr.State(True), gr.State(k)],
- outputs=output_combo)
- cancel_handles.append(click_handle)
- # 文件上传区,接收文件后与chatbot的互动
- file_upload.upload(on_file_uploaded, [file_upload, chatbot, txt], [chatbot, txt])
- upload_history.click(get_user_upload, [chatbot], outputs=[])
- # 函数插件-固定按钮区
- for k in crazy_fns:
- if not crazy_fns[k].get("AsButton", True): continue
- click_handle = crazy_fns[k]["Button"].click(ArgsGeneralWrapper(crazy_fns[k]["Function"]),
- [*input_combo, gr.State(PORT)], output_combo)
- click_handle.then(on_report_generated, [file_upload, chatbot], [file_upload, chatbot])
- cancel_handles.append(click_handle)
+ def draw_goals_auto(self):
+ with gr.Row():
+ self.ai_name = gr.Textbox(show_label=False, placeholder="Give AI a name.").style(container=False)
+ with gr.Row():
+ self.user_input = gr.Textbox(lines=5, show_label=False, placeholder="Describe your AI's role.").style(container=False)
+ with gr.Box():
+ with gr.Row() as self.goal_list:
+ self.goal_array = []
+ for text in range(4):
+ self.goal_array.append(gr.Textbox(show_label=False, placeholder="Enter up to 1 goals.").style(container=False))
+ with gr.Row():
+ self.submit_add = gr.Button("Adding goals", variant="secondary")
+ with gr.Row():
+ __l = [str(i) for i in range(10, 101, 10)]
+ __l.insert(0, '1')
+ self.submit_numer = gr.Dropdown(__l, value='1', interactive=True, label='Number of Next').style(
+ container=False)
+
+ def draw_next_auto(self):
+ with gr.Row():
+ self.submit_next = gr.Button("Next", variant="primary")
+ self.submit_auto = gr.Button("Continuous", variant="secondary")
+ self.submit_stop = gr.Button("Stop", variant="stop")
+
+ def signals_input_setting(self):
+ # 注册input
+ self.input_combo = [self.cookies, self.max_length_sl, self.md_dropdown,
+ self.txt, self.top_p, self.temperature, self.chatbot, self.history,
+ self.system_prompt, self.models_box]
+ self.output_combo = [self.cookies, self.chatbot, self.history, self.status]
+ self.predict_args = dict(fn=ArgsGeneralWrapper(predict), inputs=self.input_combo, outputs=self.output_combo)
+ # 提交按钮、重置按钮
+ self.cancel_handles.append(self.txt.submit(**self.predict_args))
+ self.cancel_handles.append(self.submitBtn.click(**self.predict_args))
+ self.resetBtn.click(lambda: ([], [], "已重置"), None, [self.chatbot, self.history, self.status])
+
+ def signals_function(self):
+ # 基础功能区的回调函数注册
+ for k in functional:
+ self.click_handle = functional[k]["Button"].click(fn=ArgsGeneralWrapper(predict),
+ inputs=[*self.input_combo, gr.State(True), gr.State(k)],
+ outputs=self.output_combo)
+ self.cancel_handles.append(self.click_handle)
+
+ def signals_public(self):
+ # 文件上传区,接收文件后与chatbot的互动
+ self.file_upload.upload(on_file_uploaded, [self.file_upload, self.chatbot, self.txt], [self.chatbot, self.txt])
+ self.upload_history.click(get_user_upload, [self.chatbot], outputs=[self.chatbot])
+ self.get_download.click(get_user_download, [self.chatbot, self.cookies, self.txt], outputs=[self.chatbot, self.txt])
+ # 函数插件-固定按钮区
+ for k in crazy_fns:
+ if not crazy_fns[k].get("AsButton", True): continue
+ self.click_handle = crazy_fns[k]["Button"].click(
+ ArgsGeneralWrapper(crazy_fns[k]["Function"]), [*self.input_combo, gr.State(PORT)], self.output_combo)
+ self.click_handle.then(on_report_generated, [self.file_upload, self.chatbot], [self.file_upload, self.chatbot])
+ self.cancel_handles.append(self.click_handle)
+
+ # 函数插件-下拉菜单与随变按钮的互动
+ def on_dropdown_changed(k):
+ variant = crazy_fns[k]["Color"] if "Color" in crazy_fns[k] else "secondary"
+ return {self.switchy_bt: gr.update(value=k, variant=variant)}
+ self.dropdown.select(on_dropdown_changed, [self.dropdown], [self.switchy_bt])
+
+ # 随变按钮的回调函数注册
+ def route(k, *args, **kwargs):
+ if k in [r"打开插件列表", r"请先从插件列表中选择"]: return
+ yield from ArgsGeneralWrapper(crazy_fns[k]["Function"])(*args, **kwargs)
+ self.click_handle = self.switchy_bt.click(route, [self.switchy_bt, *self.input_combo, gr.State(PORT)], self.output_combo)
+ self.click_handle.then(on_report_generated, [self.file_upload, self.chatbot], [self.file_upload, self.chatbot])
+ self.cancel_handles.append(self.click_handle)
+ # 终止按钮的回调函数注册
+ self.stopBtn.click(fn=None, inputs=None, outputs=None, cancels=self.cancel_handles)
+
+ # gradio的inbrowser触发不太稳定,回滚代码到原始的浏览器打开函数
+ def auto_opentab_delay(self):
+ import threading, webbrowser, time
+
+ print(f"如果浏览器没有自动打开,请复制并转到以下URL:")
+ print(f"\t(亮色主题): {self.__url}")
+ print(f"\t(暗色主题): {self.__url}/?__dark-theme=true")
+
+ def open():
+ time.sleep(2) # 打开浏览器
+ webbrowser.open_new_tab(f"http://localhost:{PORT}/?__dark-theme=true")
+
+ threading.Thread(target=open, name="open-browser", daemon=True).start()
+ threading.Thread(target=auto_update, name="self-upgrade", daemon=True).start()
+ # threading.Thread(target=warm_up_modules, name="warm-up", daemon=True).start()
+
+ def main(self):
+ with gr.Blocks(title="ChatGPT For Tester", theme=set_theme, analytics_enabled=False, css=advanced_css) as demo:
+ # 绘制页面title
+ self.draw_title()
+ # 绘制一个ROW,row会让底下的元素自动排部
+ with gr.Row():
+ # 绘制列2
+ with gr.Column(scale=2):
+ self.draw_chatbot()
+ # 绘制列1
+ with gr.Column(scale=1):
+ # 绘制对话模组
+ with gr.Tab('对话模式'):
+ self.draw_input_chat()
+ self.draw_function_chat()
+ self.draw_public_chat()
+ self.draw_setting_chat()
+ # 绘制autogpt模组
+ with gr.Tab('Auto-GPT'):
+ self.draw_goals_auto()
+ self.draw_next_auto()
+ # 函数注册,需要在Blocks下进行
+ self.signals_input_setting()
+ self.signals_function()
+ self.signals_public()
+ # Start
+ self.auto_opentab_delay()
+ demo.queue(concurrency_count=CONCURRENT_COUNT).launch(server_name="0.0.0.0", server_port=PORT, auth=AUTHENTICATION)
- # 函数插件-下拉菜单与随变按钮的互动
- def on_dropdown_changed(k):
- variant = crazy_fns[k]["Color"] if "Color" in crazy_fns[k] else "secondary"
- return {switchy_bt: gr.update(value=k, variant=variant)}
- dropdown.select(on_dropdown_changed, [dropdown], [switchy_bt])
+if __name__ == '__main__':
+ tester = ChatBot()
+ tester.main()
-
- # 随变按钮的回调函数注册
- def route(k, *args, **kwargs):
- if k in [r"打开插件列表", r"请先从插件列表中选择"]: return
- yield from ArgsGeneralWrapper(crazy_fns[k]["Function"])(*args, **kwargs)
-
-
- click_handle = switchy_bt.click(route, [switchy_bt, *input_combo, gr.State(PORT)], output_combo)
- click_handle.then(on_report_generated, [file_upload, chatbot], [file_upload, chatbot])
- # def expand_file_area(file_upload, area_file_up):
- # if len(file_upload)>0: return {area_file_up: gr.update(open=True)}
- # click_handle.then(expand_file_area, [file_upload, area_file_up], [area_file_up])
- cancel_handles.append(click_handle)
- # 终止按钮的回调函数注册
- stopBtn.click(fn=None, inputs=None, outputs=None, cancels=cancel_handles)
-
-
-# gradio的inbrowser触发不太稳定,回滚代码到原始的浏览器打开函数
-def auto_opentab_delay():
- import threading, webbrowser, time, func_box
- print(f"如果浏览器没有自动打开,请复制并转到以下URL:")
- print(f"\t(亮色主题): http://{func_box.ipaddr()}:{PORT}")
- print(f"\t(暗色主题): http://{func_box.ipaddr()}:{PORT}/?__dark-theme=true")
-
- def open():
- time.sleep(2) # 打开浏览器
- webbrowser.open_new_tab(f"http://localhost:{PORT}/?__dark-theme=true")
-
- threading.Thread(target=open, name="open-browser", daemon=True).start()
- threading.Thread(target=auto_update, name="self-upgrade", daemon=True).start()
- # threading.Thread(target=warm_up_modules, name="warm-up", daemon=True).start()
-
-
-auto_opentab_delay()
-demo.queue(concurrency_count=CONCURRENT_COUNT).launch(server_name="0.0.0.0", server_port=PORT, auth=AUTHENTICATION)
diff --git a/func_box.py b/func_box.py
index 89a73c4..e14cb8d 100644
--- a/func_box.py
+++ b/func_box.py
@@ -4,8 +4,107 @@
# @Author : Spike
# @Descr :
import hashlib
+import os.path
+import subprocess
import psutil
import re
+import tempfile
+import shutil
+from contextlib import ExitStack
+import logging
+logger = logging
+"""contextlib 是 Python 标准库中的一个模块,提供了一些工具函数和装饰器,用于支持编写上下文管理器和处理上下文的常见任务,例如资源管理、异常处理等。
+官网:https://docs.python.org/3/library/contextlib.html"""
+
+class Shell(object):
+ def __init__(self, args, stream=False):
+ self.args = args
+ self.subp = subprocess.Popen(args, shell=True,
+ stdin=subprocess.PIPE, stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE, encoding='utf-8',
+ errors='ignore', close_fds=True)
+ self.__stream = stream
+ self.__temp = ''
+
+ def read(self):
+ logger.debug(f'The command being executed is: "{self.args}"')
+ if self.__stream:
+ sysout = self.subp.stdout
+ try:
+ with sysout as std:
+ for i in std:
+ logger.info(i.rstrip())
+ self.__temp += i
+ except KeyboardInterrupt as p:
+ return 3, self.__temp+self.subp.stderr.read()
+ finally:
+ return 3, self.__temp+self.subp.stderr.read()
+ else:
+ sysout = self.subp.stdout.read()
+ syserr = self.subp.stderr.read()
+ if sysout:
+ logger.debug(f"{self.args} \n{sysout}")
+ return 1, sysout
+ elif syserr:
+ logger.error(f"{self.args} \n{syserr}")
+ return 0, syserr
+ else:
+ logger.debug(f"{self.args} \n{[sysout], [sysout]}")
+ return 2, '\n{}\n{}'.format(sysout, sysout)
+
+ def sync(self):
+ logger.debug('The command being executed is: "{}"'.format(self.args))
+ for i in self.subp.stdout:
+ logger.debug(i.rstrip())
+ self.__temp += i
+ yield self.__temp
+ for i in self.subp.stderr:
+ logger.debug(i.rstrip())
+ self.__temp += i
+ yield self.__temp
+
+def context_with(*parms):
+ """
+ 一个装饰器,根据传递的参数列表,在类方法上下文中嵌套多个 with 语句。
+ Args:
+ *parms: 参数列表,每个参数都是一个字符串,表示类中的一个属性名。
+ Returns:
+ 一个装饰器函数。
+ """
+ def decorator(cls_method):
+ """
+ 装饰器函数,用于将一个类方法转换为一个嵌套多个 with 语句的方法。
+ Args:
+ cls_method: 要装饰的类方法。
+ Returns:
+ 装饰后的类方法。
+ """
+ def wrapper(cls='', *args, **kwargs):
+ """
+ 装饰后的方法,用于嵌套多个 with 语句,并调用原始的类方法。
+ Args:
+ cls: 类的实例对象。
+ *args: 位置参数。
+ **kwargs: 关键字参数。
+ Returns:
+ 原始的类方法返回的结果。
+ """
+ with_list = [getattr(cls, arg) for arg in parms]
+ with ExitStack() as stack:
+ for context in with_list:
+ stack.enter_context(context)
+ return cls_method(cls, *args, **kwargs)
+ return wrapper
+ return decorator
+
+
+def copy_temp_file(file):
+ if os.path.exists(file):
+ exdir = tempfile.mkdtemp()
+ temp_ = shutil.copy(file, os.path.join(exdir, os.path.basename(file)))
+ return temp_
+ else:
+ return None
def md5_str(st):
# 创建一个 MD5 对象
@@ -29,12 +128,26 @@ def encryption_str(txt: str):
result = pattern.sub(lambda x: x.group(1) + ": XXXXXXXX", txt)
return result
+def tree_out(dir=os.path.dirname(__file__), line=2, more=''):
+ out = Shell(f'tree {dir} -F -I "__*|.*|venv|*.png|*.xlsx" -L {line} {more}').read()[1]
+ localfile = os.path.join(os.path.dirname(__file__), '.tree.md')
+ with open(localfile, 'w') as f:
+ f.write('```\n')
+ ll = out.splitlines()
+ for i in range(len(ll)):
+ if i == 0:
+ f.write(ll[i].split('/')[-2]+'\n')
+ else:
+ f.write(ll[i]+'\n')
+ f.write('```\n')
+
if __name__ == '__main__':
txt = "Authorization: WPS-2:AqY7ik9XQ92tvO7+NlCRvA==:b2f626f496de9c256605a15985c855a8b3e4be99\nwps-Sid: V02SgISzdeWrYdwvW_xbib-fGlqUIIw00afc5b890008c1976f\nCookie: wpsua=V1BTVUEvMS4wIChhbmRyb2lkLW9mZmljZToxNy41O2FuZHJvaWQ6MTA7ZjIwZDAyNWQzYTM5MmExMDBiYzgxNWI2NmI3Y2E5ODI6ZG1sMmJ5QldNakF5TUVFPSl2aXZvL1YyMDIwQQ=="
txt = "Authorization: WPS-2:AqY7ik9XQ92tvO7+NlCRvA==:b2f626f496de9c256605a15985c855a8b3e4be99"
print(encryption_str(txt))
+ tree_out()
diff --git a/test.py b/test.py
index 29ad5f5..9534a6e 100644
--- a/test.py
+++ b/test.py
@@ -46,6 +46,26 @@ class ChatGPTForTester:
self.book()
self.demo.launch()
-if __name__ == "__main__":
- ChatGPTForTester().main()
+
+
+class MyClass:
+
+ def __init__(self):
+ self.my_attribute1 = ''
+
+ def __getattribute__(self, name):
+ try:
+ return object.__getattribute__(self, name)
+ except AttributeError:
+ return []
+
+ def my_method(self):
+ self.test = '12312312312'
+ print("This is my method.")
+
+if __name__ == "__main__":
+ __url = gr.State(f'https://')
+ print(__url)
+
+
diff --git a/toolbox.py b/toolbox.py
index e7ecf75..c19f818 100644
--- a/toolbox.py
+++ b/toolbox.py
@@ -382,16 +382,41 @@ def find_recent_files(directory):
return recent_files
+
def get_user_upload(chatbot, ipaddr: gr.Request):
private_upload = './private_upload'
user_history = os.path.join(private_upload, ipaddr.client.host)
history = ''
- for root, d, file in os.walk(private_upload):
- history += f'目录:{root} 文件: {file}\n'
+ for root, d, file in os.walk(user_history):
+ for f in file:
+ history += f'{os.path.join(root, f)}\n\n'
chatbot.append(['我检查了之前上传的文件: ',
- '[Local Message] 请自行复制以下目录or目录/文件, 供以高亮插件使用\n'
- f'{history}'
+ '[Local Message] 请自行复制以下目录or目录/文件, 供以高亮插件使用\n\n'
+ f'> {history}'
])
+ return chatbot
+
+
+def get_user_download(chatbot, link, file):
+ for file_handle in str(file).split('\n'):
+ if os.path.isfile(file_handle):
+ # temp_file = func_box.copy_temp_file(file_handle) 无法使用外部的临时目录
+ temp_file = os.path.abspath(file_handle)
+ if temp_file:
+ dir_file, file_name = ('/'.join(str(file_handle).split('/')[-2:]), os.path.basename(file_handle))
+ chatbot.append(['Convert the file address to a download link at:',
+ f'[Local Message] Successful conversion\n\n '
+ f'{file_name}'])
+ else:
+ chatbot.append(['Convert the file address to a download link at:',
+ f'[Local Message] Conversion failed, file or not exist.'])
+ elif os.path.isdir(file_handle):
+ chatbot.append(['Convert the file address to a download link at:',
+ '[Local Message] Cannot convert directory to download link, please try again.'])
+ elif file_handle == '':
+ pass
+ return chatbot, file
+
def on_file_uploaded(files, chatbot, txt, ipaddr: gr.Request):
if len(files) == 0: