diff --git a/examples/api_request_parallel_processor.py b/examples/api_request_parallel_processor.py index a505394a..7a156154 100644 --- a/examples/api_request_parallel_processor.py +++ b/examples/api_request_parallel_processor.py @@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \ Inputs: - requests_filepath : str - path to the file containing the requests to be processed - - file should be a jsonl file, where each line is a json object with API parameters - - e.g., {"model": "text-embedding-ada-002", "input": "embed me"} + - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field + - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl - the code to generate the example file is appended to the bottom of this script @@ -164,6 +164,7 @@ async def process_api_requests_from_file( request_json=request_json, token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name), attempts_left=max_attempts, + metadata=request_json.pop("metadata", None) ) status_tracker.num_tasks_started += 1 status_tracker.num_tasks_in_progress += 1 @@ -258,6 +259,7 @@ class APIRequest: request_json: dict token_consumption: int attempts_left: int + metadata: dict result: list = field(default_factory=list) async def call_api( @@ -298,11 +300,21 @@ class APIRequest: retry_queue.put_nowait(self) else: logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}") - append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath) + data = ( + [self.request_json, [str(e) for e in self.result], self.metadata] + if self.metadata + else [self.request_json, [str(e) for e in self.result]] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_failed += 1 else: - append_to_jsonl([self.request_json, response], save_filepath) + data = ( + [self.request_json, response, self.metadata] + if self.metadata + else [self.request_json, response] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_succeeded += 1 logging.debug(f"Request {self.task_id} saved to {save_filepath}") diff --git a/examples/data/example_requests_to_parallel_process.jsonl b/examples/data/example_requests_to_parallel_process.jsonl index 43da2c57..29adf93a 100644 --- a/examples/data/example_requests_to_parallel_process.jsonl +++ b/examples/data/example_requests_to_parallel_process.jsonl @@ -1,4 +1,4 @@ -{"model": "text-embedding-ada-002", "input": "0\n"} +{"model": "text-embedding-ada-002", "input": "0\n", "metadata": {"row_id": 1}} {"model": "text-embedding-ada-002", "input": "1\n"} {"model": "text-embedding-ada-002", "input": "2\n"} {"model": "text-embedding-ada-002", "input": "3\n"}