You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
GPT-Brain/streamlit_toolkit/tools.py

375 lines
13 KiB
Python

import os
import time
import json
import streamlit as st
import tkinter as tk
from tkinter import filedialog
from langchain.llms import OpenAI
import modules.utilities as util
import modules.INFO as INFO
import modules as mod
import GPT
API_KEY = util.read_file(r'.user\API-KEYS.txt').strip()
if 'SESSION_TIME' not in st.session_state:
st.session_state['SESSION_TIME'] = time.strftime("%Y%m%d-%H%H%S")
_ = mod.language.set_language()
SESSION_TIME = st.session_state['SESSION_TIME']
CURRENT_LOG_FILE = f'{INFO.LOG_PATH}/log_{SESSION_TIME}.log'
def predict_token(query: str, prompt_core: GPT.model.prompt_core) -> (int, bool):
"""predict how many tokens to generate"""
os.environ['OPENAI_API_KEY'] = API_KEY
llm = OpenAI()
prompt = GPT.query.get_stream_prompt(query, prompt_file=prompt_core.question,
isQuestion=True,
info_file=prompt_core.my_info)
token = llm.get_num_tokens(prompt)
return token, token == 0
def create_log():
if not os.path.exists(CURRENT_LOG_FILE):
util.write_file(f'Session {SESSION_TIME}\n\n', CURRENT_LOG_FILE)
return CURRENT_LOG_FILE
def log(content, delimiter=''):
log_file = create_log()
if delimiter != '':
delimiter = f'\n\n=============={delimiter}==============\n'
util.write_file(f'\n{delimiter + content}', log_file, 'a')
def clear_log():
log_file_name = f'log_{SESSION_TIME}.log'
for root, dirs, files in os.walk(INFO.LOG_PATH):
for file in files:
if not file == log_file_name:
os.remove(os.path.join(root, file))
def download_as(label):
# download log file
with open(CURRENT_LOG_FILE, 'rb') as f:
content = f.read()
st.download_button(
label=label,
data=content,
file_name=f'log_{SESSION_TIME}.txt',
mime='text/plain'
)
def save(content, path, page='', json_value: dict = None):
if json_value is None:
json_value = []
save_but = st.button(_('💾Save'))
if save_but:
util.write_file(content, path)
st.success(_('✅File saved!'))
# write to json file
if page == '💽Brain Memory':
for key, value in json_value.items():
util.update_json(INFO.BRAIN_MEMO, key, value)
time.sleep(1)
# refresh page
st.experimental_rerun()
def match_logic(operator, filter_val, value):
if operator == 'IS':
return filter_val == value
elif operator == 'IS NOT':
return filter_val != value
elif operator == 'CONTAINS':
return filter_val in value
elif operator == 'NOT CONTAINS':
return filter_val not in value
elif operator == 'MORE THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) < float(value)
elif operator == 'LESS THAN':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) > float(value)
elif operator == 'MORE THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) <= float(value)
elif operator == 'LESS THAN OR EQUAL':
# check if value is float
if not value.isnumeric():
return False
return float(filter_val) >= float(value)
else:
return False
def select_directory(initial_dir=os.getcwd()):
root = tk.Tk()
root.withdraw()
# make sure the dialog is on top of the main window
root.attributes('-topmost', True)
directory = filedialog.askdirectory(initialdir=initial_dir, title=_('Select Note Directory'), master=root)
return directory
def match_fields(pages: list, filter_datas: list[dict]):
filtered_contents = []
for page in pages:
fields = util.extract_frontmatter(page, delimiter='---')
found_data = []
for field in fields:
if field == '':
continue
try:
found_key, found_value = field.split(': ')
except ValueError:
continue
found_key = found_key.strip()
found_value = found_value.strip()
found_data.append({
'key': found_key,
'value': found_value
})
found_match = []
for data in filter_datas:
for found in found_data:
data_key = data['key'].lower()
data_val = data['value'].lower()
found_key = found['key'].lower()
found_val = found['value'].lower()
if data_key == found_key:
if match_logic(data['logic'], data_val, found_val):
# found single match
found_match.append(True)
# if all match
if found_match.count(True) == len(filter_datas):
filtered_contents.append(page)
combined_contents = '\n\n\n\n'.join(filtered_contents)
return combined_contents
def add_filter(num, val_filter_key, val_filter_logic, val_filter_val):
# filters
col1, col2, col3 = st.columns(3)
with col1:
filter_key = st.text_input(f'Key{num}', placeholder='Key', value=val_filter_key)
with col2:
options = ['CONTAINS',
'NOT CONTAINS',
'IS',
'IS NOT',
'MORE THAN',
'LESS THAN',
'MORE THAN OR EQUAL',
'LESS THAN OR EQUAL']
default_index = util.get_index(options, val_filter_logic, 0)
logic_select = st.selectbox(f'Logic{num}', options, index=default_index)
with col3:
if isinstance(val_filter_val, int):
val_filter_val = "{:02}".format(val_filter_val)
filter_val = st.text_input(f'value{num}', placeholder='Value', value=val_filter_val)
return filter_key, logic_select, filter_val
def filter_data(pages: list, add_filter_button, del_filter_button):
init_filter_infos = util.read_json_at(INFO.BRAIN_MEMO, 'filter_info')
filter_datas = []
if add_filter_button:
st.session_state['FILTER_ROW_COUNT'] += 1
if del_filter_button:
st.session_state['FILTER_ROW_COUNT'] -= 1
if st.session_state['FILTER_ROW_COUNT'] >= 1:
for i in range(st.session_state['FILTER_ROW_COUNT'] + 1):
try:
init_info = init_filter_infos[i - 1]
init_key = init_info['key']
init_logic = init_info['logic']
init_val = init_info['value']
except IndexError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
except KeyError:
init_key = ''
init_logic = 'CONTAINS'
init_val = ''
if i == 0:
continue
# add filter
filter_key, logic_select, filter_val = add_filter(i, init_key, init_logic, init_val)
data = {'key': filter_key, 'logic': logic_select, 'value': filter_val}
filter_datas.append(data)
# filter data
filtered_contents = match_fields(pages, filter_datas)
return filtered_contents, filter_datas
def process_response(query, target_model, prompt_file: str, params: GPT.model.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
with st.spinner(_('Thinking on ') + f"{file_name}..."):
results = GPT.query.run(query,
target_model,
prompt_file,
isQuestion=False,
params=params)
# displaying results
st.header(f'📃{file_name}')
st.info(f'{results}')
time.sleep(1)
log(results, delimiter=f'{file_name.upper()}')
def process_response_stream(query, target_model, prompt_file: str, params: GPT.model.param):
# check if exclude model is not target model
file_name = util.get_file_name(prompt_file)
with st.spinner(_('Thinking on ') + f"{file_name}..."):
responses = GPT.query.run_stream(query,
target_model,
prompt_file,
isQuestion=False,
params=params)
# displaying results
st.header(f'📃{file_name}')
response_panel = st.empty()
previous_chars = ''
for response_json in responses:
choice = response_json['choices'][0]
if choice['finish_reason'] == 'stop':
break
# error handling
if choice['finish_reason'] == 'length':
st.warning("⚠️ " + _('Result cut off. max_tokens') + f' ({params.max_tokens}) ' + _('too small. Consider increasing max_tokens.'))
break
if 'gpt-3.5-turbo' in target_model:
delta = choice['delta']
if "role" in delta or delta == {}:
char = ''
else:
char = delta['content']
else:
char = choice['text']
response = previous_chars + char
response_panel.info(f'{response}')
previous_chars += char
time.sleep(1)
log(previous_chars, delimiter=f'{file_name.upper()}')
def rebuild_brain(chunk_size: int):
msg = st.warning(_('Updating Brain...'), icon="")
progress_bar = st.progress(0)
for idx, chunk_num in GPT.query.build(chunk_size):
progress_bar.progress((idx + 1) / chunk_num)
msg.success(_('Brain Updated!'), icon="👍")
time.sleep(2)
def execute_brain(q, params: GPT.model.param,
op: GPT.model.Operation,
model: GPT.model.Model,
prompt_core: GPT.model.prompt_core,
prompt_dictionary: dict,
question_prompt: str,
stream: bool
):
# log question
log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}')
if mod.check_update.is_input_updated() or mod.check_update.is_param_updated(params.chunk_size, 'chunk_size'):
rebuild_brain(params.chunk_size)
# =================stream=================
if stream:
previous_chars = ''
is_question_selected = util.contains(op.operations, question_prompt)
with st.spinner(_('Thinking on Answer')):
responses = GPT.query.run_stream(q, model.question_model,
prompt_file=prompt_core.question,
isQuestion=True,
params=params,
info_file=prompt_core.my_info)
if is_question_selected:
# displaying results
st.header(_('💬Answer'))
answer_panel = st.empty()
for response_json in responses:
choice = response_json['choices'][0]
if choice['finish_reason'] == 'stop':
break
# error handling
if choice['finish_reason'] == 'length':
st.warning("⚠️ " + _('Result cut off. max_tokens') + f' ({params.max_tokens}) ' + _('too small. Consider increasing max_tokens.'))
break
if 'gpt-3.5-turbo' in model.question_model:
delta = choice['delta']
if "role" in delta or delta == {}:
char = ''
else:
char = delta['content']
else:
char = choice['text']
answer = previous_chars + char
if is_question_selected:
answer_panel.info(f'{answer}')
previous_chars += char
time.sleep(0.1)
log(previous_chars, delimiter='ANSWER')
if len(op.operations_no_question) > 0:
for i in range(len(op.operations_no_question)):
prompt_path = prompt_dictionary[op.operations_no_question[i]]
other_model = model.other_models[i]
process_response_stream(previous_chars, other_model, prompt_path, params)
# =================stream=================
else:
# thinking on answer
with st.spinner(_('Thinking on Answer')):
responses = GPT.query.run(q, model.question_model,
prompt_file=prompt_core.question,
isQuestion=True,
params=params,
info_file=prompt_core.my_info)
if util.contains(op.operations, question_prompt):
# displaying results
st.header(_('💬Answer'))
st.info(f'{responses}')
time.sleep(1.5)
log(responses, delimiter='ANSWER')
# thinking on other outputs
if len(op.operations_no_question) > 0:
for i in range(len(op.operations_no_question)):
prompt_path = prompt_dictionary[op.operations_no_question[i]]
other_model = model.other_models[i]
process_response(responses, other_model, prompt_path, params)
def message(msg, condition=None):
if condition is not None:
if condition:
st.warning("⚠️" + msg)
else:
st.warning("⚠️" + msg)