|
|
|
@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \
|
|
|
|
|
Inputs:
|
|
|
|
|
- requests_filepath : str
|
|
|
|
|
- path to the file containing the requests to be processed
|
|
|
|
|
- file should be a jsonl file, where each line is a json object with API parameters
|
|
|
|
|
- e.g., {"model": "text-embedding-ada-002", "input": "embed me"}
|
|
|
|
|
- file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field
|
|
|
|
|
- e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}}
|
|
|
|
|
- as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically)
|
|
|
|
|
- an example file is provided at examples/data/example_requests_to_parallel_process.jsonl
|
|
|
|
|
- the code to generate the example file is appended to the bottom of this script
|
|
|
|
@ -164,6 +164,7 @@ async def process_api_requests_from_file(
|
|
|
|
|
request_json=request_json,
|
|
|
|
|
token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name),
|
|
|
|
|
attempts_left=max_attempts,
|
|
|
|
|
metadata=request_json.pop("metadata", None)
|
|
|
|
|
)
|
|
|
|
|
status_tracker.num_tasks_started += 1
|
|
|
|
|
status_tracker.num_tasks_in_progress += 1
|
|
|
|
@ -258,6 +259,7 @@ class APIRequest:
|
|
|
|
|
request_json: dict
|
|
|
|
|
token_consumption: int
|
|
|
|
|
attempts_left: int
|
|
|
|
|
metadata: dict
|
|
|
|
|
result: list = field(default_factory=list)
|
|
|
|
|
|
|
|
|
|
async def call_api(
|
|
|
|
@ -298,11 +300,21 @@ class APIRequest:
|
|
|
|
|
retry_queue.put_nowait(self)
|
|
|
|
|
else:
|
|
|
|
|
logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}")
|
|
|
|
|
append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath)
|
|
|
|
|
data = (
|
|
|
|
|
[self.request_json, [str(e) for e in self.result], self.metadata]
|
|
|
|
|
if self.metadata
|
|
|
|
|
else [self.request_json, [str(e) for e in self.result]]
|
|
|
|
|
)
|
|
|
|
|
append_to_jsonl(data, save_filepath)
|
|
|
|
|
status_tracker.num_tasks_in_progress -= 1
|
|
|
|
|
status_tracker.num_tasks_failed += 1
|
|
|
|
|
else:
|
|
|
|
|
append_to_jsonl([self.request_json, response], save_filepath)
|
|
|
|
|
data = (
|
|
|
|
|
[self.request_json, response, self.metadata]
|
|
|
|
|
if self.metadata
|
|
|
|
|
else [self.request_json, response]
|
|
|
|
|
)
|
|
|
|
|
append_to_jsonl(data, save_filepath)
|
|
|
|
|
status_tracker.num_tasks_in_progress -= 1
|
|
|
|
|
status_tracker.num_tasks_succeeded += 1
|
|
|
|
|
logging.debug(f"Request {self.task_id} saved to {save_filepath}")
|
|
|
|
|