From c725d41bd3e0e22da8c80c005ecc9476bf3798ae Mon Sep 17 00:00:00 2001 From: sean1832 Date: Tue, 21 Feb 2023 00:13:18 +1100 Subject: [PATCH] build: create streamlit_toolkit module --- GPT/__init__.py | 2 +- GPT/{model_param.py => model.py} | 12 +++ Seanium_Brain.py | 139 +++++-------------------------- streamlit_toolkit/__init__.py | 1 + streamlit_toolkit/tools.py | 124 +++++++++++++++++++++++++++ 5 files changed, 157 insertions(+), 121 deletions(-) rename GPT/{model_param.py => model.py} (53%) create mode 100644 streamlit_toolkit/__init__.py create mode 100644 streamlit_toolkit/tools.py diff --git a/GPT/__init__.py b/GPT/__init__.py index e9522b0..5a13bef 100644 --- a/GPT/__init__.py +++ b/GPT/__init__.py @@ -1,3 +1,3 @@ from GPT import query from GPT import toolkit -from GPT import model_param \ No newline at end of file +from GPT import model \ No newline at end of file diff --git a/GPT/model_param.py b/GPT/model.py similarity index 53% rename from GPT/model_param.py rename to GPT/model.py index fead8d6..72c2a8b 100644 --- a/GPT/model_param.py +++ b/GPT/model.py @@ -7,3 +7,15 @@ class param: self.present_penalty = present_penalty self.chunk_count = chunk_count self.chunk_size = chunk_size + + +class Model: + def __init__(self, question_model, other_models): + self.question_model = question_model + self.other_models = other_models + + +class Operation: + def __init__(self, operations, operations_no_question): + self.operations = operations + self.operations_no_question = operations_no_question diff --git a/Seanium_Brain.py b/Seanium_Brain.py index 094d7f3..7b924d8 100644 --- a/Seanium_Brain.py +++ b/Seanium_Brain.py @@ -1,4 +1,3 @@ -import time import os import streamlit as st @@ -7,6 +6,7 @@ import modules.INFO as INFO import modules as mod import GPT import modules.utilities as util +import streamlit_toolkit.tools as st_tool SESSION_LANG = st.session_state['SESSION_LANGUAGE'] PROMPT_PATH = f'.user/prompt/{SESSION_LANG}' @@ -16,65 +16,6 @@ util.remove_oldest_file(INFO.LOG_PATH, 10) header = st.container() body = st.container() - -def create_log(): - if not os.path.exists(INFO.CURRENT_LOG_FILE): - util.write_file(f'Session {INFO.SESSION_TIME}\n\n', INFO.CURRENT_LOG_FILE) - return INFO.CURRENT_LOG_FILE - - -def log(content, delimiter=''): - log_file = create_log() - if delimiter != '': - delimiter = f'\n\n=============={delimiter}==============\n' - util.write_file(f'\n{delimiter + content}', log_file, 'a') - - -def clear_log(): - log_file_name = f'log_{INFO.SESSION_TIME}.log' - for root, dirs, files in os.walk(INFO.LOG_PATH): - for file in files: - if not file == log_file_name: - os.remove(os.path.join(root, file)) - - -def save_as(): - # download log file - with open(INFO.CURRENT_LOG_FILE, 'rb') as f: - content = f.read() - st.download_button( - label=_("📥download log"), - data=content, - file_name=f'log_{INFO.SESSION_TIME}.txt', - mime='text/plain' - ) - - -def process_response(query, target_model, prompt_file: str, data: GPT.model_param.param): - # check if exclude model is not target model - file_name = util.get_file_name(prompt_file) - with st.spinner(_('Thinking on ') + f"{file_name}..."): - results = GPT.query.run(query, target_model, prompt_file, - data.temp, - data.max_tokens, - data.top_p, - data.frequency_penalty, - data.present_penalty) - # displaying results - st.header(f'📃{file_name}') - st.info(f'{results}') - time.sleep(1) - log(results, delimiter=f'{file_name.upper()}') - - -def message(msg, condition=None): - if condition is not None: - if condition: - st.warning("⚠️" + msg) - else: - st.warning("⚠️" + msg) - - # sidebar with st.sidebar: _ = mod.language.set_language() @@ -121,15 +62,21 @@ with st.sidebar: value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_size', 4000)) chunk_count = st.slider(_('Answer count'), 1, 5, value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_count', 1)) - param = GPT.model_param.param(temp=temp, - max_tokens=max_tokens, - top_p=top_p, - frequency_penalty=freq_panl, - present_penalty=pres_panl, - chunk_size=chunk_size, - chunk_count=chunk_count) + param = GPT.model.param(temp=temp, + max_tokens=max_tokens, + top_p=top_p, + frequency_penalty=freq_panl, + present_penalty=pres_panl, + chunk_size=chunk_size, + chunk_count=chunk_count) - if st.button(_('Clear Log'), on_click=clear_log): + op = GPT.model.Operation(operations=operations, + operations_no_question=operations_no_question) + + models = GPT.model.Model(question_model=question_model, + other_models=other_models) + + if st.button(_('Clear Log'), on_click=st_tool.clear_log): st.success(_('Log Cleared')) # info @@ -144,56 +91,8 @@ with header: st.title(_('🧠GPT-Brain')) st.text(_('This is my personal AI powered brain feeding my own Obsidian notes. Ask anything.')) - message(_("This is a beta version. Please [🪲report bugs](") + util.read_json_at(INFO.MANIFEST, 'bugs') + _( - ") if you find any.")) - - -def execute_brain(q): - # log question - log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}') - - if mod.check_update.isUpdated(): - st.success(_('Building Brain...')) - # if brain-info is updated - GPT.query.build(chunk_size) - st.success(_('Brain rebuild!')) - time.sleep(2) - - # thinking on answer - with st.spinner(_('Thinking on Answer')): - answer = GPT.query.run_answer(q, question_model, temp, max_tokens, top_p, freq_panl, pres_panl, - chunk_count=chunk_count) - if util.contains(operations, _('question')): - # displaying results - st.header(_('💬Answer')) - st.info(f'{answer}') - time.sleep(1) - log(answer, delimiter='ANSWER') - - # thinking on other outputs - if len(operations_no_question) > 0: - for i in range(len(operations_no_question)): - prompt_path = prompt_dictionary[operations_no_question[i]] - other_model = other_models[i] - process_response(answer, other_model, prompt_path, param) - # convert param to dictionary - param_dict = vars(param) - - # write param to json - for key in param_dict: - value = param_dict[key] - util.update_json(INFO.BRAIN_MEMO, key, value) - - # write operation to json - util.update_json(INFO.BRAIN_MEMO, f'operations_{SESSION_LANG}', operations) - - # write question model to json - util.update_json(INFO.BRAIN_MEMO, 'question_model', question_model) - - # write other models to json - for i in range(len(operations_no_question)): - util.update_json(INFO.BRAIN_MEMO, f'{operations_no_question[i]}_model', other_models[i]) - + st_tool.message(_("This is a beta version. Please [🪲report bugs](") + + util.read_json_at(INFO.MANIFEST, 'bugs') + _(") if you find any.")) # main with body: @@ -203,7 +102,7 @@ with body: send = st.button(_('📩Send')) with col2: if os.path.exists(INFO.CURRENT_LOG_FILE): - save_as() + st_tool.save_as() # execute brain calculation if not question == '' and send: - execute_brain(question) + st_tool.execute_brain(question, param, op, models, prompt_dictionary, SESSION_LANG) diff --git a/streamlit_toolkit/__init__.py b/streamlit_toolkit/__init__.py new file mode 100644 index 0000000..9ec5d44 --- /dev/null +++ b/streamlit_toolkit/__init__.py @@ -0,0 +1 @@ +from streamlit_toolkit import tools diff --git a/streamlit_toolkit/tools.py b/streamlit_toolkit/tools.py new file mode 100644 index 0000000..18d7ac0 --- /dev/null +++ b/streamlit_toolkit/tools.py @@ -0,0 +1,124 @@ +import os +import time +import streamlit as st + +import modules.utilities as util +import modules.INFO as INFO +import modules as mod +import GPT + +_ = mod.language.set_language() + + +def create_log(): + if not os.path.exists(INFO.CURRENT_LOG_FILE): + util.write_file(f'Session {INFO.SESSION_TIME}\n\n', INFO.CURRENT_LOG_FILE) + return INFO.CURRENT_LOG_FILE + + +def log(content, delimiter=''): + log_file = create_log() + if delimiter != '': + delimiter = f'\n\n=============={delimiter}==============\n' + util.write_file(f'\n{delimiter + content}', log_file, 'a') + + +def clear_log(): + log_file_name = f'log_{INFO.SESSION_TIME}.log' + for root, dirs, files in os.walk(INFO.LOG_PATH): + for file in files: + if not file == log_file_name: + os.remove(os.path.join(root, file)) + + +def save_as(): + # download log file + with open(INFO.CURRENT_LOG_FILE, 'rb') as f: + content = f.read() + st.download_button( + label=_("📥download log"), + data=content, + file_name=f'log_{INFO.SESSION_TIME}.txt', + mime='text/plain' + ) + + +def process_response(query, target_model, prompt_file: str, data: GPT.model.param): + # check if exclude model is not target model + file_name = util.get_file_name(prompt_file) + with st.spinner(_('Thinking on ') + f"{file_name}..."): + results = GPT.query.run(query, target_model, prompt_file, + data.temp, + data.max_tokens, + data.top_p, + data.frequency_penalty, + data.present_penalty) + # displaying results + st.header(f'📃{file_name}') + st.info(f'{results}') + time.sleep(1) + log(results, delimiter=f'{file_name.upper()}') + + +def execute_brain(q, params: GPT.model.param, + op: GPT.model.Operation, + model: GPT.model.Model, + prompt_dictionary: dict, + session_language): + # log question + log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}') + + if mod.check_update.isUpdated(): + st.success(_('Building Brain...')) + # if brain-info is updated + GPT.query.build(params.chunk_size) + st.success(_('Brain rebuild!')) + time.sleep(2) + + # thinking on answer + with st.spinner(_('Thinking on Answer')): + answer = GPT.query.run_answer(q, model.question_model, + params.temp, + params.max_tokens, + params.top_p, + params.frequency_penalty, + params.present_penalty, + chunk_count=params.chunk_count) + if util.contains(op.operations, _('question')): + # displaying results + st.header(_('💬Answer')) + st.info(f'{answer}') + time.sleep(1) + log(answer, delimiter='ANSWER') + + # thinking on other outputs + if len(op.operations_no_question) > 0: + for i in range(len(op.operations_no_question)): + prompt_path = prompt_dictionary[op.operations_no_question[i]] + other_model = model.other_models[i] + process_response(answer, other_model, prompt_path, params) + # convert param to dictionary + param_dict = vars(params) + + # write param to json + for key in param_dict: + value = param_dict[key] + util.update_json(INFO.BRAIN_MEMO, key, value) + + # write operation to json + util.update_json(INFO.BRAIN_MEMO, f'operations_{session_language}', op.operations) + + # write question model to json + util.update_json(INFO.BRAIN_MEMO, 'question_model', model.question_model) + + # write other models to json + for i in range(len(op.operations_no_question)): + util.update_json(INFO.BRAIN_MEMO, f'{op.operations_no_question[i]}_model', model.other_models[i]) + + +def message(msg, condition=None): + if condition is not None: + if condition: + st.warning("⚠️" + msg) + else: + st.warning("⚠️" + msg)