build: create streamlit_toolkit module

Code_Style
sean1832 1 year ago
parent 7506ef7bba
commit c725d41bd3

@ -1,3 +1,3 @@
from GPT import query
from GPT import toolkit
from GPT import model_param
from GPT import model

@ -7,3 +7,15 @@ class param:
self.present_penalty = present_penalty
self.chunk_count = chunk_count
self.chunk_size = chunk_size
class Model:
def __init__(self, question_model, other_models):
self.question_model = question_model
self.other_models = other_models
class Operation:
def __init__(self, operations, operations_no_question):
self.operations = operations
self.operations_no_question = operations_no_question

@ -1,4 +1,3 @@
import time
import os
import streamlit as st
@ -7,6 +6,7 @@ import modules.INFO as INFO
import modules as mod
import GPT
import modules.utilities as util
import streamlit_toolkit.tools as st_tool
SESSION_LANG = st.session_state['SESSION_LANGUAGE']
PROMPT_PATH = f'.user/prompt/{SESSION_LANG}'
@ -16,65 +16,6 @@ util.remove_oldest_file(INFO.LOG_PATH, 10)
header = st.container()
body = st.container()
def create_log():
if not os.path.exists(INFO.CURRENT_LOG_FILE):
util.write_file(f'Session {INFO.SESSION_TIME}\n\n', INFO.CURRENT_LOG_FILE)
return INFO.CURRENT_LOG_FILE
def log(content, delimiter=''):
log_file = create_log()
if delimiter != '':
delimiter = f'\n\n=============={delimiter}==============\n'
util.write_file(f'\n{delimiter + content}', log_file, 'a')
def clear_log():
log_file_name = f'log_{INFO.SESSION_TIME}.log'
for root, dirs, files in os.walk(INFO.LOG_PATH):
for file in files:
if not file == log_file_name:
os.remove(os.path.join(root, file))
def save_as():
# download log file
with open(INFO.CURRENT_LOG_FILE, 'rb') as f:
content = f.read()
st.download_button(
label=_("📥download log"),
data=content,
file_name=f'log_{INFO.SESSION_TIME}.txt',
mime='text/plain'
)
def process_response(query, target_model, prompt_file: str, data: GPT.model_param.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
with st.spinner(_('Thinking on ') + f"{file_name}..."):
results = GPT.query.run(query, target_model, prompt_file,
data.temp,
data.max_tokens,
data.top_p,
data.frequency_penalty,
data.present_penalty)
# displaying results
st.header(f'📃{file_name}')
st.info(f'{results}')
time.sleep(1)
log(results, delimiter=f'{file_name.upper()}')
def message(msg, condition=None):
if condition is not None:
if condition:
st.warning("⚠️" + msg)
else:
st.warning("⚠️" + msg)
# sidebar
with st.sidebar:
_ = mod.language.set_language()
@ -121,15 +62,21 @@ with st.sidebar:
value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_size', 4000))
chunk_count = st.slider(_('Answer count'), 1, 5, value=util.read_json_at(INFO.BRAIN_MEMO, 'chunk_count', 1))
param = GPT.model_param.param(temp=temp,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=freq_panl,
present_penalty=pres_panl,
chunk_size=chunk_size,
chunk_count=chunk_count)
param = GPT.model.param(temp=temp,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=freq_panl,
present_penalty=pres_panl,
chunk_size=chunk_size,
chunk_count=chunk_count)
if st.button(_('Clear Log'), on_click=clear_log):
op = GPT.model.Operation(operations=operations,
operations_no_question=operations_no_question)
models = GPT.model.Model(question_model=question_model,
other_models=other_models)
if st.button(_('Clear Log'), on_click=st_tool.clear_log):
st.success(_('Log Cleared'))
# info
@ -144,56 +91,8 @@ with header:
st.title(_('🧠GPT-Brain'))
st.text(_('This is my personal AI powered brain feeding my own Obsidian notes. Ask anything.'))
message(_("This is a beta version. Please [🪲report bugs](") + util.read_json_at(INFO.MANIFEST, 'bugs') + _(
") if you find any."))
def execute_brain(q):
# log question
log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}')
if mod.check_update.isUpdated():
st.success(_('Building Brain...'))
# if brain-info is updated
GPT.query.build(chunk_size)
st.success(_('Brain rebuild!'))
time.sleep(2)
# thinking on answer
with st.spinner(_('Thinking on Answer')):
answer = GPT.query.run_answer(q, question_model, temp, max_tokens, top_p, freq_panl, pres_panl,
chunk_count=chunk_count)
if util.contains(operations, _('question')):
# displaying results
st.header(_('💬Answer'))
st.info(f'{answer}')
time.sleep(1)
log(answer, delimiter='ANSWER')
# thinking on other outputs
if len(operations_no_question) > 0:
for i in range(len(operations_no_question)):
prompt_path = prompt_dictionary[operations_no_question[i]]
other_model = other_models[i]
process_response(answer, other_model, prompt_path, param)
# convert param to dictionary
param_dict = vars(param)
# write param to json
for key in param_dict:
value = param_dict[key]
util.update_json(INFO.BRAIN_MEMO, key, value)
# write operation to json
util.update_json(INFO.BRAIN_MEMO, f'operations_{SESSION_LANG}', operations)
# write question model to json
util.update_json(INFO.BRAIN_MEMO, 'question_model', question_model)
# write other models to json
for i in range(len(operations_no_question)):
util.update_json(INFO.BRAIN_MEMO, f'{operations_no_question[i]}_model', other_models[i])
st_tool.message(_("This is a beta version. Please [🪲report bugs](") +
util.read_json_at(INFO.MANIFEST, 'bugs') + _(") if you find any."))
# main
with body:
@ -203,7 +102,7 @@ with body:
send = st.button(_('📩Send'))
with col2:
if os.path.exists(INFO.CURRENT_LOG_FILE):
save_as()
st_tool.save_as()
# execute brain calculation
if not question == '' and send:
execute_brain(question)
st_tool.execute_brain(question, param, op, models, prompt_dictionary, SESSION_LANG)

@ -0,0 +1 @@
from streamlit_toolkit import tools

@ -0,0 +1,124 @@
import os
import time
import streamlit as st
import modules.utilities as util
import modules.INFO as INFO
import modules as mod
import GPT
_ = mod.language.set_language()
def create_log():
if not os.path.exists(INFO.CURRENT_LOG_FILE):
util.write_file(f'Session {INFO.SESSION_TIME}\n\n', INFO.CURRENT_LOG_FILE)
return INFO.CURRENT_LOG_FILE
def log(content, delimiter=''):
log_file = create_log()
if delimiter != '':
delimiter = f'\n\n=============={delimiter}==============\n'
util.write_file(f'\n{delimiter + content}', log_file, 'a')
def clear_log():
log_file_name = f'log_{INFO.SESSION_TIME}.log'
for root, dirs, files in os.walk(INFO.LOG_PATH):
for file in files:
if not file == log_file_name:
os.remove(os.path.join(root, file))
def save_as():
# download log file
with open(INFO.CURRENT_LOG_FILE, 'rb') as f:
content = f.read()
st.download_button(
label=_("📥download log"),
data=content,
file_name=f'log_{INFO.SESSION_TIME}.txt',
mime='text/plain'
)
def process_response(query, target_model, prompt_file: str, data: GPT.model.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
with st.spinner(_('Thinking on ') + f"{file_name}..."):
results = GPT.query.run(query, target_model, prompt_file,
data.temp,
data.max_tokens,
data.top_p,
data.frequency_penalty,
data.present_penalty)
# displaying results
st.header(f'📃{file_name}')
st.info(f'{results}')
time.sleep(1)
log(results, delimiter=f'{file_name.upper()}')
def execute_brain(q, params: GPT.model.param,
op: GPT.model.Operation,
model: GPT.model.Model,
prompt_dictionary: dict,
session_language):
# log question
log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}')
if mod.check_update.isUpdated():
st.success(_('Building Brain...'))
# if brain-info is updated
GPT.query.build(params.chunk_size)
st.success(_('Brain rebuild!'))
time.sleep(2)
# thinking on answer
with st.spinner(_('Thinking on Answer')):
answer = GPT.query.run_answer(q, model.question_model,
params.temp,
params.max_tokens,
params.top_p,
params.frequency_penalty,
params.present_penalty,
chunk_count=params.chunk_count)
if util.contains(op.operations, _('question')):
# displaying results
st.header(_('💬Answer'))
st.info(f'{answer}')
time.sleep(1)
log(answer, delimiter='ANSWER')
# thinking on other outputs
if len(op.operations_no_question) > 0:
for i in range(len(op.operations_no_question)):
prompt_path = prompt_dictionary[op.operations_no_question[i]]
other_model = model.other_models[i]
process_response(answer, other_model, prompt_path, params)
# convert param to dictionary
param_dict = vars(params)
# write param to json
for key in param_dict:
value = param_dict[key]
util.update_json(INFO.BRAIN_MEMO, key, value)
# write operation to json
util.update_json(INFO.BRAIN_MEMO, f'operations_{session_language}', op.operations)
# write question model to json
util.update_json(INFO.BRAIN_MEMO, 'question_model', model.question_model)
# write other models to json
for i in range(len(op.operations_no_question)):
util.update_json(INFO.BRAIN_MEMO, f'{op.operations_no_question[i]}_model', model.other_models[i])
def message(msg, condition=None):
if condition is not None:
if condition:
st.warning("⚠️" + msg)
else:
st.warning("⚠️" + msg)
Loading…
Cancel
Save