@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \
Inputs :
- requests_filepath : str
- path to the file containing the requests to be processed
- file should be a jsonl file , where each line is a json object with API parameters
- e . g . , { " model " : " text-embedding-ada-002 " , " input " : " embed me " }
- file should be a jsonl file , where each line is a json object with API parameters and an optional metadata field
- e . g . , { " model " : " text-embedding-ada-002 " , " input " : " embed me " , " metadata " : { " row_id " : 1 } }
- as with all jsonl files , take care that newlines in the content are properly escaped ( json . dumps does this automatically )
- an example file is provided at examples / data / example_requests_to_parallel_process . jsonl
- the code to generate the example file is appended to the bottom of this script
@ -164,6 +164,7 @@ async def process_api_requests_from_file(
request_json = request_json ,
token_consumption = num_tokens_consumed_from_request ( request_json , api_endpoint , token_encoding_name ) ,
attempts_left = max_attempts ,
metadata = request_json . pop ( " metadata " , None )
)
status_tracker . num_tasks_started + = 1
status_tracker . num_tasks_in_progress + = 1
@ -258,6 +259,7 @@ class APIRequest:
request_json : dict
token_consumption : int
attempts_left : int
metadata : dict
result : list = field ( default_factory = list )
async def call_api (
@ -298,11 +300,21 @@ class APIRequest:
retry_queue . put_nowait ( self )
else :
logging . error ( f " Request { self . request_json } failed after all attempts. Saving errors: { self . result } " )
append_to_jsonl ( [ self . request_json , [ str ( e ) for e in self . result ] ] , save_filepath )
data = (
[ self . request_json , [ str ( e ) for e in self . result ] , self . metadata ]
if self . metadata
else [ self . request_json , [ str ( e ) for e in self . result ] ]
)
append_to_jsonl ( data , save_filepath )
status_tracker . num_tasks_in_progress - = 1
status_tracker . num_tasks_failed + = 1
else :
append_to_jsonl ( [ self . request_json , response ] , save_filepath )
data = (
[ self . request_json , response , self . metadata ]
if self . metadata
else [ self . request_json , response ]
)
append_to_jsonl ( data , save_filepath )
status_tracker . num_tasks_in_progress - = 1
status_tracker . num_tasks_succeeded + = 1
logging . debug ( f " Request { self . task_id } saved to { save_filepath } " )