From efae0d4bc107af8ea8f76e541af9115ac668180f Mon Sep 17 00:00:00 2001 From: Domenico Manna Date: Tue, 23 May 2023 17:19:18 -0400 Subject: [PATCH] Add ability to store row-level metadata This metadata field allows non API parameters to be stored which could be useful for when the results need to be associated with an external source. For example, if the user has several rows of text data within a database, and they want to calculate embeddings for each row, they can now use the metadata field to save the row id so that when they parse the results back, they know which embedding corresponds to which database row. --- examples/api_request_parallel_processor.py | 20 +++++++++++++++---- ...example_requests_to_parallel_process.jsonl | 2 +- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/examples/api_request_parallel_processor.py b/examples/api_request_parallel_processor.py index a505394a..7a156154 100644 --- a/examples/api_request_parallel_processor.py +++ b/examples/api_request_parallel_processor.py @@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \ Inputs: - requests_filepath : str - path to the file containing the requests to be processed - - file should be a jsonl file, where each line is a json object with API parameters - - e.g., {"model": "text-embedding-ada-002", "input": "embed me"} + - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field + - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}} - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically) - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl - the code to generate the example file is appended to the bottom of this script @@ -164,6 +164,7 @@ async def process_api_requests_from_file( request_json=request_json, token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name), attempts_left=max_attempts, + metadata=request_json.pop("metadata", None) ) status_tracker.num_tasks_started += 1 status_tracker.num_tasks_in_progress += 1 @@ -258,6 +259,7 @@ class APIRequest: request_json: dict token_consumption: int attempts_left: int + metadata: dict result: list = field(default_factory=list) async def call_api( @@ -298,11 +300,21 @@ class APIRequest: retry_queue.put_nowait(self) else: logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}") - append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath) + data = ( + [self.request_json, [str(e) for e in self.result], self.metadata] + if self.metadata + else [self.request_json, [str(e) for e in self.result]] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_failed += 1 else: - append_to_jsonl([self.request_json, response], save_filepath) + data = ( + [self.request_json, response, self.metadata] + if self.metadata + else [self.request_json, response] + ) + append_to_jsonl(data, save_filepath) status_tracker.num_tasks_in_progress -= 1 status_tracker.num_tasks_succeeded += 1 logging.debug(f"Request {self.task_id} saved to {save_filepath}") diff --git a/examples/data/example_requests_to_parallel_process.jsonl b/examples/data/example_requests_to_parallel_process.jsonl index 43da2c57..29adf93a 100644 --- a/examples/data/example_requests_to_parallel_process.jsonl +++ b/examples/data/example_requests_to_parallel_process.jsonl @@ -1,4 +1,4 @@ -{"model": "text-embedding-ada-002", "input": "0\n"} +{"model": "text-embedding-ada-002", "input": "0\n", "metadata": {"row_id": 1}} {"model": "text-embedding-ada-002", "input": "1\n"} {"model": "text-embedding-ada-002", "input": "2\n"} {"model": "text-embedding-ada-002", "input": "3\n"}