Merge pull request #448 from domenicomanna/parallel-processor-add-metadata

Add ability to store row-level metadata
pull/450/head
Ted Sanders 1 year ago committed by GitHub
commit 806a01cba4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -31,8 +31,8 @@ python examples/api_request_parallel_processor.py \
Inputs:
- requests_filepath : str
- path to the file containing the requests to be processed
- file should be a jsonl file, where each line is a json object with API parameters
- e.g., {"model": "text-embedding-ada-002", "input": "embed me"}
- file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field
- e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}}
- as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically)
- an example file is provided at examples/data/example_requests_to_parallel_process.jsonl
- the code to generate the example file is appended to the bottom of this script
@ -164,6 +164,7 @@ async def process_api_requests_from_file(
request_json=request_json,
token_consumption=num_tokens_consumed_from_request(request_json, api_endpoint, token_encoding_name),
attempts_left=max_attempts,
metadata=request_json.pop("metadata", None)
)
status_tracker.num_tasks_started += 1
status_tracker.num_tasks_in_progress += 1
@ -258,6 +259,7 @@ class APIRequest:
request_json: dict
token_consumption: int
attempts_left: int
metadata: dict
result: list = field(default_factory=list)
async def call_api(
@ -298,11 +300,21 @@ class APIRequest:
retry_queue.put_nowait(self)
else:
logging.error(f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}")
append_to_jsonl([self.request_json, [str(e) for e in self.result]], save_filepath)
data = (
[self.request_json, [str(e) for e in self.result], self.metadata]
if self.metadata
else [self.request_json, [str(e) for e in self.result]]
)
append_to_jsonl(data, save_filepath)
status_tracker.num_tasks_in_progress -= 1
status_tracker.num_tasks_failed += 1
else:
append_to_jsonl([self.request_json, response], save_filepath)
data = (
[self.request_json, response, self.metadata]
if self.metadata
else [self.request_json, response]
)
append_to_jsonl(data, save_filepath)
status_tracker.num_tasks_in_progress -= 1
status_tracker.num_tasks_succeeded += 1
logging.debug(f"Request {self.task_id} saved to {save_filepath}")

@ -1,4 +1,4 @@
{"model": "text-embedding-ada-002", "input": "0\n"}
{"model": "text-embedding-ada-002", "input": "0\n", "metadata": {"row_id": 1}}
{"model": "text-embedding-ada-002", "input": "1\n"}
{"model": "text-embedding-ada-002", "input": "2\n"}
{"model": "text-embedding-ada-002", "input": "3\n"}

Loading…
Cancel
Save