langchain/libs/cli/langchain_cli/utils/git.py
Erick Friis f39246bd7e
cli should pull instead of delete+clone (#12607)
Co-authored-by: Harrison Chase <hw.chase.17@gmail.com>
2023-10-30 16:44:09 -07:00

198 lines
6.1 KiB
Python

import hashlib
import re
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Sequence, TypedDict
from git import Repo
from langchain_cli.constants import (
DEFAULT_GIT_REF,
DEFAULT_GIT_REPO,
DEFAULT_GIT_SUBDIRECTORY,
)
class DependencySource(TypedDict):
git: str
ref: Optional[str]
subdirectory: Optional[str]
api_path: Optional[str]
event_metadata: Dict
# use poetry dependency string format
def parse_dependency_string(
dep: Optional[str],
repo: Optional[str],
branch: Optional[str],
api_path: Optional[str],
) -> DependencySource:
if dep is not None and dep.startswith("git+"):
if repo is not None or branch is not None:
raise ValueError(
"If a dependency starts with git+, you cannot manually specify "
"a repo or branch."
)
# remove git+
gitstring = dep[4:]
subdirectory = None
ref = None
# first check for #subdirectory= on the end
if "#subdirectory=" in gitstring:
gitstring, subdirectory = gitstring.split("#subdirectory=")
if "#" in subdirectory or "@" in subdirectory:
raise ValueError(
"#subdirectory must be the last part of the dependency string"
)
# find first slash after ://
# find @ or # after that slash
# remainder is ref
# if no @ or #, then ref is None
# find first slash after ://
if "://" not in gitstring:
raise ValueError(
"git+ dependencies must start with git+https:// or git+ssh://"
)
_, find_slash = gitstring.split("://", 1)
if "/" not in find_slash:
post_slash = find_slash
ref = None
else:
_, post_slash = find_slash.split("/", 1)
if "@" in post_slash or "#" in post_slash:
_, ref = re.split(r"[@#]", post_slash, 1)
# gitstring is everything before that
gitstring = gitstring[: -len(ref) - 1] if ref is not None else gitstring
return DependencySource(
git=gitstring,
ref=ref,
subdirectory=subdirectory,
api_path=api_path,
event_metadata={"dependency_string": dep},
)
elif dep is not None and dep.startswith("https://"):
raise ValueError("Only git dependencies are supported")
else:
# if repo is none, use default, including subdirectory
base_subdir = Path(DEFAULT_GIT_SUBDIRECTORY) if repo is None else Path()
subdir = str(base_subdir / dep) if dep is not None else None
gitstring = (
DEFAULT_GIT_REPO
if repo is None
else f"https://github.com/{repo.strip('/')}.git"
)
ref = DEFAULT_GIT_REF if branch is None else branch
# it's a default git repo dependency
return DependencySource(
git=gitstring,
ref=ref,
subdirectory=subdir,
api_path=api_path,
event_metadata={
"dependency_string": dep,
"used_repo_flag": repo is not None,
"used_branch_flag": branch is not None,
},
)
def _list_arg_to_length(arg: Optional[List[str]], num: int) -> Sequence[Optional[str]]:
if not arg:
return [None] * num
elif len(arg) == 1:
return arg * num
elif len(arg) == num:
return arg
else:
raise ValueError(f"Argument must be of length 1 or {num}")
def parse_dependencies(
dependencies: Optional[List[str]],
repo: List[str],
branch: List[str],
api_path: List[str],
) -> List[DependencySource]:
num_deps = max(
len(dependencies) if dependencies is not None else 0, len(repo), len(branch)
)
if (
(dependencies and len(dependencies) != num_deps)
or (api_path and len(api_path) != num_deps)
or (repo and len(repo) not in [1, num_deps])
or (branch and len(branch) not in [1, num_deps])
):
raise ValueError(
"Number of defined repos/branches/api_paths did not match the "
"number of templates."
)
inner_deps = _list_arg_to_length(dependencies, num_deps)
inner_api_paths = _list_arg_to_length(api_path, num_deps)
inner_repos = _list_arg_to_length(repo, num_deps)
inner_branches = _list_arg_to_length(branch, num_deps)
return [
parse_dependency_string(iter_dep, iter_repo, iter_branch, iter_api_path)
for iter_dep, iter_repo, iter_branch, iter_api_path in zip(
inner_deps, inner_repos, inner_branches, inner_api_paths
)
]
def _get_repo_path(gitstring: str, ref: Optional[str], repo_dir: Path) -> Path:
# only based on git for now
ref_str = ref if ref is not None else ""
hashed = hashlib.sha256((f"{gitstring}:{ref_str}").encode("utf-8")).hexdigest()[:8]
removed_protocol = gitstring.split("://")[-1]
removed_basename = re.split(r"[/:]", removed_protocol, 1)[-1]
removed_extras = removed_basename.split("#")[0]
foldername = re.sub(r"[^a-zA-Z0-9_]", "_", removed_extras)
directory_name = f"{foldername}_{hashed}"
return repo_dir / directory_name
def update_repo(gitstring: str, ref: Optional[str], repo_dir: Path) -> Path:
# see if path already saved
repo_path = _get_repo_path(gitstring, ref, repo_dir)
if repo_path.exists():
# try pulling
try:
repo = Repo(repo_path)
if repo.active_branch.name != ref:
raise ValueError()
repo.remotes.origin.pull()
except Exception:
# if it fails, delete and clone again
shutil.rmtree(repo_path)
Repo.clone_from(gitstring, repo_path, branch=ref, depth=1)
else:
Repo.clone_from(gitstring, repo_path, branch=ref, depth=1)
return repo_path
def copy_repo(
source: Path,
destination: Path,
) -> None:
"""
Copies a repo, ignoring git folders.
Raises FileNotFound error if it can't find source
"""
def ignore_func(_, files):
return [f for f in files if f == ".git"]
shutil.copytree(source, destination, ignore=ignore_func)