diff --git a/langchain/document_loaders/blockchain.py b/langchain/document_loaders/blockchain.py index 0ddbceee..11bdea4a 100644 --- a/langchain/document_loaders/blockchain.py +++ b/langchain/document_loaders/blockchain.py @@ -1,7 +1,8 @@ import os import re +import time from enum import Enum -from typing import List +from typing import List, Optional import requests @@ -25,11 +26,18 @@ class BlockchainDocumentLoader(BaseLoader): If no BlockchainType is specified, the default is Ethereum mainnet. The Loader uses the Alchemy API to interact with the blockchain. + ALCHEMY_API_KEY environment variable must be set to use this loader. The API returns 100 NFTs per request and can be paginated using the startToken parameter. - ALCHEMY_API_KEY environment variable must be set to use this loader. + If get_all_tokens is set to True, the loader will get all tokens + on the contract. Note that for contracts with a large number of tokens, + this may take a long time (e.g. 10k tokens is 100 requests). + Default value is false for this reason. + + The max_execution_time (sec) can be set to limit the execution time + of the loader. Future versions of this loader can: - Support additional Alchemy APIs (e.g. getTransactions, etc.) @@ -42,11 +50,15 @@ class BlockchainDocumentLoader(BaseLoader): blockchainType: BlockchainType = BlockchainType.ETH_MAINNET, api_key: str = "docs-demo", startToken: str = "", + get_all_tokens: bool = False, + max_execution_time: Optional[int] = None, ): self.contract_address = contract_address self.blockchainType = blockchainType.value self.api_key = os.environ.get("ALCHEMY_API_KEY") or api_key self.startToken = startToken + self.get_all_tokens = get_all_tokens + self.max_execution_time = max_execution_time if not self.api_key: raise ValueError("Alchemy API key not provided.") @@ -55,34 +67,90 @@ class BlockchainDocumentLoader(BaseLoader): raise ValueError(f"Invalid contract address {self.contract_address}") def load(self) -> List[Document]: - url = ( - f"https://{self.blockchainType}.g.alchemy.com/nft/v2/" - f"{self.api_key}/getNFTsForCollection?withMetadata=" - f"True&contractAddress={self.contract_address}" - f"&startToken={self.startToken}" - ) + result = [] + + current_start_token = self.startToken + + start_time = time.time() + + while True: + url = ( + f"https://{self.blockchainType}.g.alchemy.com/nft/v2/" + f"{self.api_key}/getNFTsForCollection?withMetadata=" + f"True&contractAddress={self.contract_address}" + f"&startToken={current_start_token}" + ) - response = requests.get(url) + response = requests.get(url) - if response.status_code != 200: - raise ValueError(f"Request failed with status code {response.status_code}") + if response.status_code != 200: + raise ValueError( + f"Request failed with status code {response.status_code}" + ) - items = response.json()["nfts"] + items = response.json()["nfts"] - if not (items): + if not items: + break + + for item in items: + content = str(item) + tokenId = item["id"]["tokenId"] + metadata = { + "source": self.contract_address, + "blockchain": self.blockchainType, + "tokenId": tokenId, + } + result.append(Document(page_content=content, metadata=metadata)) + + # exit after the first API call if get_all_tokens is False + if not self.get_all_tokens: + break + + # get the start token for the next API call from the last item in array + current_start_token = self._get_next_tokenId(result[-1].metadata["tokenId"]) + + if ( + self.max_execution_time is not None + and (time.time() - start_time) > self.max_execution_time + ): + raise RuntimeError("Execution time exceeded the allowed time limit.") + + if not result: raise ValueError( f"No NFTs found for contract address {self.contract_address}" ) - result = [] - - for item in items: - content = str(item) - tokenId = item["id"]["tokenId"] - metadata = { - "source": self.contract_address, - "blockchain": self.blockchainType, - "tokenId": tokenId, - } - result.append(Document(page_content=content, metadata=metadata)) return result + + # add one to the tokenId, ensuring the correct tokenId format is used + def _get_next_tokenId(self, tokenId: str) -> str: + value_type = self._detect_value_type(tokenId) + + if value_type == "hex_0x": + value_int = int(tokenId, 16) + elif value_type == "hex_0xbf": + value_int = int(tokenId[2:], 16) + else: + value_int = int(tokenId) + + result = value_int + 1 + + if value_type == "hex_0x": + return "0x" + format(result, "0" + str(len(tokenId) - 2) + "x") + elif value_type == "hex_0xbf": + return "0xbf" + format(result, "0" + str(len(tokenId) - 4) + "x") + else: + return str(result) + + # A smart contract can use different formats for the tokenId + @staticmethod + def _detect_value_type(tokenId: str) -> str: + if isinstance(tokenId, int): + return "int" + elif tokenId.startswith("0x"): + return "hex_0x" + elif tokenId.startswith("0xbf"): + return "hex_0xbf" + else: + return "hex_0xbf" diff --git a/tests/integration_tests/document_loaders/test_blockchain.py b/tests/integration_tests/document_loaders/test_blockchain.py index ea29ec8d..8c452fb2 100644 --- a/tests/integration_tests/document_loaders/test_blockchain.py +++ b/tests/integration_tests/document_loaders/test_blockchain.py @@ -1,4 +1,5 @@ import os +import time import pytest @@ -14,11 +15,18 @@ else: @pytest.mark.skipif(not alchemyKeySet, reason="Alchemy API key not provided.") def test_get_nfts_valid_contract() -> None: + max_alchemy_tokens = 100 contract_address = ( "0x1a92f7381b9f03921564a437210bb9396471050c" # CoolCats contract address ) result = BlockchainDocumentLoader(contract_address).load() - assert len(result) > 0, "No NFTs returned" + + print("Tokens returend for valid contract: ", len(result)) + + assert len(result) == max_alchemy_tokens, ( + f"Wrong number of NFTs returned. " + f"Expected {max_alchemy_tokens}, got {len(result)}" + ) @pytest.mark.skipif(not alchemyKeySet, reason="Alchemy API key not provided.") @@ -26,7 +34,7 @@ def test_get_nfts_with_pagination() -> None: contract_address = ( "0x1a92f7381b9f03921564a437210bb9396471050c" # CoolCats contract address ) - startToken = "20" + startToken = "0x0000000000000000000000000000000000000000000000000000000000000077" result = BlockchainDocumentLoader( contract_address, @@ -35,6 +43,8 @@ def test_get_nfts_with_pagination() -> None: startToken=startToken, ).load() + print("Tokens returend for contract with offset: ", len(result)) + assert len(result) > 0, "No NFTs returned" @@ -46,6 +56,9 @@ def test_get_nfts_polygon() -> None: result = BlockchainDocumentLoader( contract_address, BlockchainType.POLYGON_MAINNET ).load() + + print("Tokens returend for contract on Polygon: ", len(result)) + assert len(result) > 0, "No NFTs returned" @@ -62,3 +75,50 @@ def test_get_nfts_invalid_contract() -> None: str(error_NoNfts.value) == "No NFTs found for contract address " + contract_address ) + + +@pytest.mark.skipif(not alchemyKeySet, reason="Alchemy API key not provided.") +def test_get_all() -> None: + start_time = time.time() + + contract_address = ( + "0x448676ffCd0aDf2D85C1f0565e8dde6924A9A7D9" # Polygon contract address + ) + result = BlockchainDocumentLoader( + contract_address=contract_address, + blockchainType=BlockchainType.POLYGON_MAINNET, + api_key=os.environ["ALCHEMY_API_KEY"], + startToken="100", + get_all_tokens=True, + ).load() + + end_time = time.time() + + print( + f"Tokens returned for {contract_address} " + f"contract: {len(result)} in {end_time - start_time} seconds" + ) + + assert len(result) > 0, "No NFTs returned" + + +@pytest.mark.skipif(not alchemyKeySet, reason="Alchemy API key not provided.") +def test_get_all_10sec_timeout() -> None: + start_time = time.time() + + contract_address = ( + "0x1a92f7381b9f03921564a437210bb9396471050c" # Cool Cats contract address + ) + + with pytest.raises(RuntimeError): + BlockchainDocumentLoader( + contract_address=contract_address, + blockchainType=BlockchainType.ETH_MAINNET, + api_key=os.environ["ALCHEMY_API_KEY"], + get_all_tokens=True, + max_execution_time=10, + ).load() + + end_time = time.time() + + print("Execution took ", end_time - start_time, " seconds")