You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
langchain/langchain/utilities/powerbi.py

250 lines
9.5 KiB
Python

"""Wrapper around a Power BI endpoint."""
from __future__ import annotations
import logging
import os
from copy import deepcopy
from typing import Any, Dict, Iterable, List, Optional, Union
import aiohttp
import requests
from aiohttp import ServerTimeoutError
from pydantic import BaseModel, Field, root_validator
from requests.exceptions import Timeout
_LOGGER = logging.getLogger(__name__)
BASE_URL = os.getenv("POWERBI_BASE_URL", "https://api.powerbi.com/v1.0/myorg")
try:
from azure.core.credentials import TokenCredential
except ImportError:
_LOGGER.log(
logging.WARNING,
"Could not import azure.core python package.",
)
class PowerBIDataset(BaseModel):
"""Create PowerBI engine from dataset ID and credential or token.
Use either the credential or a supplied token to authenticate.
If both are supplied the credential is used to generate a token.
The impersonated_user_name is the UPN of a user to be impersonated.
If the model is not RLS enabled, this will be ignored.
"""
dataset_id: str
table_names: List[str]
group_id: Optional[str] = None
credential: Optional[TokenCredential] = None
token: Optional[str] = None
impersonated_user_name: Optional[str] = None
sample_rows_in_table_info: int = Field(default=1, gt=0, le=10)
aiosession: Optional[aiohttp.ClientSession] = None
schemas: Dict[str, str] = Field(default_factory=dict, init=False)
class Config:
"""Configuration for this pydantic object."""
arbitrary_types_allowed = True
@root_validator(pre=True, allow_reuse=True)
def token_or_credential_present(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that at least one of token and credentials is present."""
if "token" in values or "credential" in values:
return values
raise ValueError("Please provide either a credential or a token.")
@property
def request_url(self) -> str:
"""Get the request url."""
if self.group_id:
return f"{BASE_URL}/groups/{self.group_id}/datasets/{self.dataset_id}/executeQueries" # noqa: E501 # pylint: disable=C0301
return f"{BASE_URL}/datasets/{self.dataset_id}/executeQueries" # noqa: E501 # pylint: disable=C0301
@property
def headers(self) -> Dict[str, str]:
"""Get the token."""
if self.token:
return {
"Content-Type": "application/json",
"Authorization": "Bearer " + self.token,
}
from azure.core.exceptions import (
ClientAuthenticationError, # pylint: disable=import-outside-toplevel
)
if self.credential:
try:
token = self.credential.get_token(
"https://analysis.windows.net/powerbi/api/.default"
).token
return {
"Content-Type": "application/json",
"Authorization": "Bearer " + token,
}
except Exception as exc: # pylint: disable=broad-exception-caught
raise ClientAuthenticationError(
"Could not get a token from the supplied credentials."
) from exc
raise ClientAuthenticationError("No credential or token supplied.")
def get_table_names(self) -> Iterable[str]:
"""Get names of tables available."""
return self.table_names
def get_schemas(self) -> str:
"""Get the available schema's."""
if self.schemas:
return ", ".join([f"{key}: {value}" for key, value in self.schemas.items()])
return "No known schema's yet. Use the schema_powerbi tool first."
@property
def table_info(self) -> str:
"""Information about all tables in the database."""
return self.get_table_info()
def _get_tables_to_query(
self, table_names: Optional[Union[List[str], str]] = None
) -> List[str]:
"""Get the tables names that need to be queried."""
if table_names is not None:
if (
isinstance(table_names, list)
and len(table_names) > 0
and table_names[0] != ""
):
return table_names
if isinstance(table_names, str) and table_names != "":
return [table_names]
return self.table_names
def _get_tables_todo(self, tables_todo: List[str]) -> List[str]:
"""Get the tables that still need to be queried."""
todo = deepcopy(tables_todo)
for table in todo:
if table not in self.table_names:
_LOGGER.warning("Table %s not found in dataset.", table)
todo.remove(table)
continue
if table in self.schemas:
todo.remove(table)
return todo
def _get_schema_for_tables(self, table_names: List[str]) -> str:
"""Create a string of the table schemas for the supplied tables."""
schemas = [
schema for table, schema in self.schemas.items() if table in table_names
]
return ", ".join(schemas)
def get_table_info(
self, table_names: Optional[Union[List[str], str]] = None
) -> str:
"""Get information about specified tables."""
tables_requested = self._get_tables_to_query(table_names)
tables_todo = self._get_tables_todo(tables_requested)
for table in tables_todo:
if " " in table and not table.startswith("'") and not table.endswith("'"):
table = f"'{table}'"
try:
result = self.run(
f"EVALUATE TOPN({self.sample_rows_in_table_info}, {table})"
)
except Timeout:
_LOGGER.warning("Timeout while getting table info for %s", table)
self.schemas[table] = "unknown"
continue
except Exception as exc: # pylint: disable=broad-exception-caught
_LOGGER.warning("Error while getting table info for %s: %s", table, exc)
self.schemas[table] = "unknown"
continue
self.schemas[table] = json_to_md(result["results"][0]["tables"][0]["rows"])
return self._get_schema_for_tables(tables_requested)
async def aget_table_info(
self, table_names: Optional[Union[List[str], str]] = None
) -> str:
"""Get information about specified tables."""
tables_requested = self._get_tables_to_query(table_names)
tables_todo = self._get_tables_todo(tables_requested)
for table in tables_todo:
if " " in table and not table.startswith("'") and not table.endswith("'"):
table = f"'{table}'"
try:
result = await self.arun(
f"EVALUATE TOPN({self.sample_rows_in_table_info}, {table})"
)
except ServerTimeoutError:
_LOGGER.warning("Timeout while getting table info for %s", table)
self.schemas[table] = "unknown"
continue
except Exception as exc: # pylint: disable=broad-exception-caught
_LOGGER.warning("Error while getting table info for %s: %s", table, exc)
self.schemas[table] = "unknown"
continue
self.schemas[table] = json_to_md(result["results"][0]["tables"][0]["rows"])
return self._get_schema_for_tables(tables_requested)
def _create_json_content(self, command: str) -> dict[str, Any]:
"""Create the json content for the request."""
return {
"queries": [{"query": rf"{command}"}],
"impersonatedUserName": self.impersonated_user_name,
"serializerSettings": {"includeNulls": True},
}
def run(self, command: str) -> Any:
"""Execute a DAX command and return a json representing the results."""
_LOGGER.debug("Running command: %s", command)
result = requests.post(
self.request_url,
json=self._create_json_content(command),
headers=self.headers,
timeout=10,
)
return result.json()
async def arun(self, command: str) -> Any:
"""Execute a DAX command and return the result asynchronously."""
_LOGGER.debug("Running command: %s", command)
if self.aiosession:
async with self.aiosession.post(
self.request_url,
headers=self.headers,
json=self._create_json_content(command),
timeout=10,
) as response:
response_json = await response.json()
return response_json
async with aiohttp.ClientSession() as session:
async with session.post(
self.request_url,
headers=self.headers,
json=self._create_json_content(command),
timeout=10,
) as response:
response_json = await response.json()
return response_json
def json_to_md(
json_contents: List[Dict[str, Union[str, int, float]]],
table_name: Optional[str] = None,
) -> str:
"""Converts a JSON object to a markdown table."""
output_md = ""
headers = json_contents[0].keys()
for header in headers:
header.replace("[", ".").replace("]", "")
if table_name:
header.replace(f"{table_name}.", "")
output_md += f"| {header} "
output_md += "|\n"
for row in json_contents:
for value in row.values():
output_md += f"| {value} "
output_md += "|\n"
return output_md