import streamlit as st from modules import utilities as util from modules import model_data import brain import check_update import time import os # activate session if 'SESSION_TIME' not in st.session_state: st.session_state['SESSION_TIME'] = time.strftime("%Y%m%d-%H%H%S") st.set_page_config( page_title='Seanium Brain' ) util.remove_oldest_file('.user/log', 10) model_options = ['text-davinci-003', 'text-curie-001', 'text-babbage-001', 'text-ada-001'] header = st.container() body = st.container() LOG_PATH = '.user/log' PROMPT_PATH = '.user/prompt' SESSION_TIME = st.session_state['SESSION_TIME'] CURRENT_LOG_FILE = f'{LOG_PATH}/log_{SESSION_TIME}.log' BRAIN_MEMO = '.user/brain-memo.json' MANIFEST = '.core/manifest.json' def create_log(): if not os.path.exists(CURRENT_LOG_FILE): util.write_file(f'Session {SESSION_TIME}\n\n', CURRENT_LOG_FILE) return CURRENT_LOG_FILE def log(content, delimiter=''): log_file = create_log() if delimiter != '': delimiter = f'\n\n=============={delimiter}==============\n' util.write_file(f'\n{delimiter + content}', log_file, 'a') def clear_log(): log_file_name = f'log_{SESSION_TIME}.log' for root, dirs, files in os.walk(LOG_PATH): for file in files: if not file == log_file_name: os.remove(os.path.join(root, file)) def save_as(): # download log file with open(CURRENT_LOG_FILE, 'rb') as f: content = f.read() st.download_button( label="📥download log", data=content, file_name=f'log_{SESSION_TIME}.txt', mime='text/plain' ) def process_response(query, target_model, prompt_file: str, data: model_data.param): # check if exclude model is not target model file_name = util.get_file_name(prompt_file) print(f'Processing {file_name}...') with st.spinner(f'Thinking on {file_name}...'): results = brain.run(query, target_model, prompt_file, data.temp, data.max_tokens, data.top_p, data.frequency_penalty, data.present_penalty) # displaying results st.header(f'📃{file_name}') st.info(f'{results}') time.sleep(1) log(results, delimiter=f'{file_name.upper()}') # sidebar with st.sidebar: st.title('Settings') prompt_files = util.scan_directory(PROMPT_PATH) prompt_file_names = [util.get_file_name(file) for file in prompt_files] prompt_dictionary = dict(zip(prompt_file_names, prompt_files)) # remove 'my-info' from prompt dictionary prompt_dictionary.pop('my-info') operation_options = list(prompt_dictionary.keys()) operations = st.multiselect('Operations', operation_options, default=util.read_json_at(BRAIN_MEMO, 'operations', operation_options[0])) last_question_model = util.read_json_at(BRAIN_MEMO, 'question_model', model_options[0]) # get index of last question model question_model_index = util.get_index(model_options, last_question_model) question_model = st.selectbox('Question Model', model_options, index=question_model_index) operations_no_question = [op for op in operations if op != 'question'] other_models = [] replace_tokens = [] for operation in operations_no_question: last_model = util.read_json_at(BRAIN_MEMO, f'{operation}_model', model_options[0]) # get index of last model model_index = util.get_index(model_options, last_model) model = st.selectbox(f'{operation} Model', model_options, index=model_index) other_models.append(model) temp = st.slider('Temperature', 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'temp', 0.1)) max_tokens = st.slider('Max Tokens', 850, 4500, value=util.read_json_at(BRAIN_MEMO, 'max_tokens', 1000)) with st.expander(label='Advanced Options'): top_p = st.slider('Top_P', 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'top_p', 1.0)) freq_panl = st.slider('Frequency penalty', 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'frequency_penalty', 0.0)) pres_panl = st.slider('Presence penalty', 0.0, 1.0, value=util.read_json_at(BRAIN_MEMO, 'present_penalty', 0.0)) chunk_size = st.slider('Chunk Size', 1500, 4500, value=util.read_json_at(BRAIN_MEMO, 'chunk_size', 4000)) chunk_count = st.slider('Answer Count', 1, 5, value=util.read_json_at(BRAIN_MEMO, 'chunk_count', 1)) param = model_data.param(temp=temp, max_tokens=max_tokens, top_p=top_p, frequency_penalty=freq_panl, present_penalty=pres_panl, chunk_size=chunk_size, chunk_count=chunk_count) if st.button('Clear Log', on_click=clear_log): st.success('Log Cleared') # info st.markdown('---') st.markdown(f"# {util.read_json_at(MANIFEST, 'name')}") st.markdown(f"version: {util.read_json_at(MANIFEST, 'version')}") st.markdown(f"author: {util.read_json_at(MANIFEST, 'author')}") st.markdown(f"[Report bugs]({util.read_json_at(MANIFEST, 'bugs')})") st.markdown(f"[Github Repo]({util.read_json_at(MANIFEST, 'homepage')})") with header: st.title('🧠Seanium Brain') st.text('This is my personal AI powered brain feeding my own Obsidian notes. Ask anything.') def execute_brain(q): # log question log(f'\n\n\n\n[{str(time.ctime())}] - QUESTION: {q}') if check_update.isUpdated(): st.success('Building Brain...') # if brain-info is updated brain.build(chunk_size) st.success('Brain rebuild!') time.sleep(2) # thinking on answer with st.spinner('Thinking on Answer'): answer = brain.run_answer(q, question_model, temp, max_tokens, top_p, freq_panl, pres_panl, chunk_count=chunk_count) if util.contains(operations, 'question'): # displaying results st.header('💬Answer') st.info(f'{answer}') time.sleep(1) log(answer, delimiter='ANSWER') # thinking on other outputs if len(operations_no_question) > 0: for i in range(len(operations_no_question)): prompt_path = prompt_dictionary[operations_no_question[i]] other_model = other_models[i] process_response(answer, other_model, prompt_path, param) # convert param to dictionary param_dict = vars(param) # write param to json for key in param_dict: value = param_dict[key] util.update_json(BRAIN_MEMO, key, value) # write operation to json util.update_json(BRAIN_MEMO, 'operations', operations) # write question model to json util.update_json(BRAIN_MEMO, 'question_model', question_model) # write other models to json for i in range(len(operations_no_question)): util.update_json(BRAIN_MEMO, f'{operations_no_question[i]}_model', other_models[i]) # main with body: question = st.text_area('Ask Brain: ') col1, col2, col3, col4 = st.columns(4) with col1: send = st.button('📩Send') with col2: if os.path.exists(CURRENT_LOG_FILE): save_as() # execute brain calculation if not question == '' and send: execute_brain(question)