adds nv-update-locator utility and support utils/libs

pull/50/head
Vladislav Yarmak 5 years ago
parent 5adcc24a94
commit 75a69c3491

@ -0,0 +1,107 @@
# Do not stage vim swapfiles to commit
*.swp
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/

@ -0,0 +1,171 @@
nv-driver-locator
=================
nv-driver-locator is a tool for internal usage, which purpose is to notify about new Nvidia driver releases. It's kernel supports and performs:
* Update retrieval from multiple sources (**channels** component).
* Notification through various ways (**notifiers** component).
* Driver info matching and aggregation via configurable set of attributes (**hasher** component).
* Persistence of collected data for keeping track on already seen drivers (**db** component).
## Requirements
* Python 3.4+
## Overview
### Structure
All scripts may be used both as standalone application and importable module. For CLI synopsys invoke program with `--help` option.
* nv-driver-locator.py - main executable, intended to be run as cron job.
* mailer.py - module with email routines and minimalistic email client for test purposes.
* gfe\_get\_driver.py - GeForce Experience client library (and test util).
### Operation
1. Cron job queries all configured channels.
2. Program aggregates responses by hashing their's values covered by `key_components`. `key_components` is a list of JSON paths (represented by list too) specified in config file.
3. Program queries DB if given hash has any match in database.
4. If no match found and we have new instance all notifiers getting fired.
5. New record gets written into DB.
## Configuration example
```json
{
"db": {
"type": "file",
"params": {
"workdir": "/var/lib/nv-driver-locator"
}
},
"key_components": [
[
"DriverAttributes",
"Version"
]
],
"channels": [
{
"type": "gfe_client",
"name": "desktop defaults",
"params": {}
},
{
"type": "gfe_client",
"name": "desktop beta",
"params": {
"beta": true
}
},
{
"type": "gfe_client",
"name": "mobile",
"params": {
"notebook": true
}
},
{
"type": "gfe_client",
"name": "mobile beta",
"params": {
"notebook": true,
"beta": true
}
}
],
"notifiers": [
{
"type": "email",
"name": "my email",
"params": {
"from_addr": "notify-bot@gmail.com",
"to_addrs": [
"recepient1@domain1.tld",
"recepient2@domain2.tld"
],
"host": "smtp.google.com",
"use_starttls": true,
"login": "notify-bot",
"password": "MyGoodPass"
}
},
{
"type": "command",
"name": "sample command",
"params": {
"timeout": 10.0,
"cmdline": [
"cat",
"-"
]
}
}
]
}
```
## Components Reference
### DB
#### FileDB
Stores data in files.
Type: `file`
Params:
* `workdir` - files location
### Channels
#### GFEClientChannel
Queries latest driver for Windows, using GeForce Experience API.
Type: `gfe_client`
Params:
* `notebook` - seek for Mobile driver. Default: `false`
* `x86_64` - seek for 64bit driver. Default: `true`
* `os_version` - OS version. Default: `"10.0"`
* `os_build` - OS build. Default: `"17763"`
* `language` - language. Default: `1033` (English)
* `beta` - request Beta driver. Default: `false`
* `dch` - request DCH driver. Default: `false` (request Standard Driver)
### Notifiers
#### CommandNotifier
Runs external process and pipes JSON with info about new driver into it
Type: `command`
Params:
* `cmdline` - list of command line arguments (where first is executable name)
* `timeout` - allowed execution time in seconds. Default: `10.0`
#### EmailNotifier
Sends email with attached JSON file with driver info. Supports TLS, STARTTLS and authentication, so it can be used to send notification via mailbox provided by public services like gmail.
Type: `email`
Params:
* `from_addr` - originating address
* `to_addrs` - list of destination addresses
* `host` - SMTP host. Default: `localhost`
* `port` - SMTP port. Default: depends on chosen TLS/STARTTLS mode.
* `local_hostname` - hostname used in EHLO/HELO commands. Default: auto
* `use_ssl` - use SSL from beginning of connection. Default: `false`
* `use_starttls` - use STARTTLS. Default: `false`
* `login` - user login name. Default: `null` (do not use authentication)
* `password` - user password. Default: `null`
* `timeout` - allowed delay in seconds for each network operation. Default: `10.0`

@ -0,0 +1,141 @@
#!/usr/bin/env python3
import urllib.request
import urllib.error
import json
import posixpath
import codecs
USER_AGENT = 'NvBackend/34.0.0.0'
TIMEOUT = 10
def serialize_req(obj):
return json.dumps(obj, separators=(',', ':'))
def getDispDrvrByDevid(query_obj):
ENDPOINT = 'https://gfwsl.geforce.com/nvidia_web_services/' \
'controller.gfeclientcontent.NG.php/' \
'com.nvidia.services.GFEClientContent_NG.getDispDrvrByDevid'
url = posixpath.join(ENDPOINT, serialize_req(query_obj))
http_req = urllib.request.Request(
url,
data=None,
headers={
'User-Agent': USER_AGENT
}
)
with urllib.request.urlopen(http_req, None, TIMEOUT) as resp:
coding = resp.headers.get_content_charset()
coding = coding if coding is not None else 'utf-8-sig'
decoder = codecs.getreader(coding)(resp)
res = json.load(decoder)
return res
def get_latest_geforce_driver(*,
notebook=False,
x86_64=True,
os_version="10.0",
os_build="17763",
language=1033,
beta=False,
dch=False):
# GeForce GTX 1080 and GP104 HD Audio
dt_id = ["1B80_10DE_119E_10DE"]
# GeForce GTX 1080 Mobile
nb_id = ["1BE0_10DE"]
dev_id = nb_id if notebook else dt_id
query_obj = {
"dIDa": dev_id, # Device PCI IDs:
# ["DEVID_VENID_DEVID_VENID"]
"osC": os_version, # OS version (Windows 10)
"osB": os_build, # OS build
"is6": "1" if x86_64 else "0", # 0 - 32bit, 1 - 64bit
"lg": str(language), # Language code
"iLp": "1" if notebook else "0", # System Is Laptop
"prvMd": "0", # Private Model?
"gcV": "3.16.0.140", # GeForce Experience client version
"gIsB": "1" if beta else "0", # Beta?
"dch": "1" if dch else "0" # 0 - Standard Driver, 1 - DCH Driver
}
try:
res = getDispDrvrByDevid(query_obj)
except urllib.error.HTTPError as e:
if e.code == 404:
res = None
else:
raise e
return res
def parse_args():
import argparse
def parse_lang(lang):
lang = int(lang)
if not (0x0 <= lang <= 0xFFFF):
raise ValueError("Bad language ID")
return lang
parser = argparse.ArgumentParser(
description="Retrieves info about latest NVIDIA drivers from GeForce "
"Experience",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-V", "--os-version",
default="10.0",
help="OS version")
parser.add_argument("-B", "--os-build",
default="17763",
help="OS build")
parser.add_argument("-l", "--language",
default=1033,
type=parse_lang,
help="Driver language code")
parser.add_argument("-m", "--notebook",
help="Query for notebook drivers (Mobile series)",
action="store_true")
parser.add_argument("-3", "--32bit",
help="Query for 32bit drivers",
dest="_32bit",
action="store_true")
parser.add_argument("-b", "--beta",
help="Allow beta-versions in search result",
action="store_true")
parser.add_argument("-D", "--dch",
help="Query DCH driver instead of Standard driver",
action="store_true")
parser.add_argument("-R", "--raw",
help="Raw JSON output",
action="store_true")
args = parser.parse_args()
return args
def main():
import sys
args = parse_args()
drv = get_latest_geforce_driver(os_version=args.os_version,
os_build=args.os_build,
language=args.language,
notebook=args.notebook,
x86_64=(not args._32bit),
beta=args.beta,
dch=args.dch)
if drv is None:
print("NOT FOUND")
sys.exit(3)
if not args.raw:
print("Version: %s" % (drv['DriverAttributes']['Version'],))
print("Beta: %s" % (bool(int(drv['DriverAttributes']['IsBeta'])),))
print("WHQL: %s" % (bool(int(drv['DriverAttributes']['IsWHQL'])),))
print("URL: %s" % (drv['DriverAttributes']['DownloadURLAdmin'],))
else:
json.dump(drv, sys.stdout, indent=4)
sys.stdout.flush()
if __name__ == '__main__':
main()

@ -0,0 +1,149 @@
#!/usr/bin/env python3
import smtplib
import ssl
class Mailer:
def __init__(self, *,
from_addr,
host='localhost',
port=None,
local_hostname=None,
use_ssl=False,
use_starttls=False,
login=None,
password=None,
timeout=10):
if use_ssl or use_starttls:
self._ssl_context = ssl.create_default_context()
self._from_addr = from_addr
self._host = host
self._local_hostname = local_hostname
self._use_ssl = use_ssl
self._use_starttls = use_starttls
self._login = login
self._password = password
self._timeout = timeout
if port is None:
if use_ssl:
self._port = 465
elif use_starttls:
self._port = 587
else:
self._port = 25
else:
self._port = port
def send(self, to, msg, mail_options=(), rcpt_options=()):
if not self._use_ssl:
server = smtplib.SMTP(self._host, self._port, self._local_hostname,
self._timeout)
else:
server = smtplib.SMTP_SSL(self._host, self._port,
self._local_hostname,
timeout=self._timeout,
context=self._ssl_context)
with server:
if self._use_starttls and not self._use_ssl:
server.starttls(context=self._ssl_context)
if self._login is not None:
server.login(self._login, self._password)
server.sendmail(self._from_addr, to, msg,
mail_options, rcpt_options)
def parse_args():
import argparse
def check_positive_float(val):
val = float(val)
if val <= 0:
raise ValueError("Value %s is not valid positive float" %
(repr(val),))
return val
def check_port(val):
val = int(val)
if not (0 < val <= 0xFFFF):
raise ValueError("Value %s is not valid port number" %
(repr(val),))
return val
parser = argparse.ArgumentParser(
description="Simple email sender, suitable for modern email services.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-f", "--from",
required=True,
dest="from_address",
help="originating address")
parser.add_argument("-H", "--smtp-host",
default='localhost',
help="hostname of local MTA or external SMTP service")
parser.add_argument("-P", "--smtp-port",
type=check_port,
help="SMTP port. "
"Default value depends on SSL/TLS mode")
parser.add_argument("-L", "--local-hostname",
help="hostname to use in EHLO/HELO commands. "
"Defaults to autodiscover of local host name.")
tls_group = parser.add_mutually_exclusive_group()
tls_group.add_argument("-S", "--ssl",
help="use SSL from beginning of connection",
action="store_true")
tls_group.add_argument("-s", "--starttls",
help="use STARTTLS command for secure connection",
action="store_true")
parser.add_argument("-l", "--login",
help="user login name. "
"If omitted, no login performed.")
parser.add_argument("-p", "--password",
help="user password used for login")
parser.add_argument("-T", "--timeout",
type=check_positive_float,
default=10.,
help="timeout for network operations")
parser.add_argument("-j", "--subject",
default="",
help="email subject")
parser.add_argument("-m", "--message",
help="email message body. If not specified, message "
"will be read from stdin")
parser.add_argument("recipient",
nargs="+",
help="email destination address(es)")
args = parser.parse_args()
return args
def main():
import sys
from email.mime.text import MIMEText
args = parse_args()
m = Mailer(from_addr=args.from_address,
host=args.smtp_host,
port=args.smtp_port,
local_hostname=args.local_hostname,
use_ssl=args.ssl,
use_starttls=args.starttls,
login=args.login,
password=args.password,
timeout=args.timeout)
if args.message is None:
print("Reading message from standard input...", file=sys.stderr)
msg = sys.stdin.read()
else:
msg = args.message
msg = MIMEText(msg)
msg['Subject'] = args.subject
msg['From'] = args.from_address
msg['To'] = ', '.join(args.recipient)
m.send(args.recipient, msg.as_string())
if __name__ == '__main__':
main()

@ -0,0 +1,71 @@
{
"db": {
"type": "file",
"params": {
"workdir": "/var/lib/nv-driver-locator"
}
},
"key_components": [
[
"DriverAttributes",
"Version"
]
],
"channels": [
{
"type": "gfe_client",
"name": "desktop defaults",
"params": {}
},
{
"type": "gfe_client",
"name": "desktop beta",
"params": {
"beta": true
}
},
{
"type": "gfe_client",
"name": "mobile",
"params": {
"notebook": true
}
},
{
"type": "gfe_client",
"name": "mobile beta",
"params": {
"notebook": true,
"beta": true
}
}
],
"notifiers": [
{
"type": "email",
"name": "my email",
"params": {
"from_addr": "notify-bot@gmail.com",
"to_addrs": [
"recepient1@domain1.tld",
"recepient2@domain2.tld"
],
"host": "smtp.google.com",
"use_starttls": true,
"login": "notify-bot",
"password": "MyGoodPass"
}
},
{
"type": "command",
"name": "sample command",
"params": {
"timeout": 10.0,
"cmdline": [
"cat",
"-"
]
}
}
]
}

@ -0,0 +1,290 @@
#!/usr/bin/env python3
import sys
import json
import argparse
import hashlib
import importlib
import logging
from abc import ABC, abstractmethod
HASH_DELIM = b'\x00'
HASH = hashlib.sha256
class BaseDB(ABC):
@abstractmethod
def check_key(self, key):
pass
@abstractmethod
def set_key(self, key, value):
pass
class FileDB(BaseDB):
def __init__(self, workdir):
self._ospath = importlib.import_module('os.path')
self._tempfile = importlib.import_module('tempfile')
self._wd = workdir
self._test_writable()
def _test_writable(self):
TEST_STRING = b"test"
with self._tempfile.NamedTemporaryFile('w+b', 0, dir=self._wd) as f:
f.write(TEST_STRING)
f.flush()
with open(f.name, 'rb') as tf:
assert tf.read() == TEST_STRING, "Test write failed"
def _get_key_filename(self, key):
return self._ospath.join(self._wd, key + '.json')
def check_key(self, key):
filename = self._get_key_filename(key)
return self._ospath.isfile(filename)
def set_key(self, key, obj):
filename = self._get_key_filename(key)
with open(filename, 'w') as f:
json.dump(obj, f, indent=4)
f.flush()
class Hasher:
def __init__(self, key_components):
self._key_components = key_components
def _eval_key_component(self, obj, component_path):
res = obj
for path_component in component_path:
res = res[path_component]
return str(res).encode('utf-8')
def hash_object(self, obj):
return HASH(HASH_DELIM.join(
self._eval_key_component(obj, c) for c in self._key_components)
).hexdigest()
class BaseNotifier(ABC):
@abstractmethod
def notify(self, obj):
pass
class EmailNotifier(BaseNotifier):
def __init__(self, name, *,
from_addr,
to_addrs,
host='localhost',
port=None,
local_hostname=None,
use_ssl=False,
use_starttls=False,
login=None,
password=None,
timeout=10):
self.name = name
self._from_addr = from_addr
self._Mailer = importlib.import_module('mailer').Mailer
self._MIMEText = importlib.import_module('email.mime.text').MIMEText
self._MIMEMult = importlib.import_module(
'email.mime.multipart').MIMEMultipart
self._MIMEBase = importlib.import_module('email.mime.base').MIMEBase
self._encoders = importlib.import_module('email.encoders')
self._m = self._Mailer(from_addr=from_addr,
host=host,
port=port,
local_hostname=local_hostname,
use_ssl=use_ssl,
use_starttls=use_starttls,
login=login,
password=password,
timeout=timeout)
self._to_addrs = to_addrs
def notify(self, obj):
msg = self._MIMEMult()
msg['Subject'] = "New Nvidia driver available!"
msg['From'] = self._from_addr
msg['To'] = ', '.join(self._to_addrs)
body = "See attached JSON"
msg.attach(self._MIMEText(body, 'plain'))
p = self._MIMEBase('application', 'octet-stream')
p.set_payload(json.dumps(obj, indent=4).encode('utf-8'))
self._encoders.encode_base64(p)
p.add_header('Content-Disposition', "attachment; filename=obj.json")
msg.attach(p)
self._m.send(self._to_addrs, msg.as_string())
class CommandNotifier(BaseNotifier):
def __init__(self, name, *,
cmdline,
timeout=10):
self.name = name
self._subprocess = importlib.import_module('subprocess')
self._cmdline = cmdline
self._timeout = timeout
def notify(self, obj):
proc = self._subprocess.Popen(self._cmdline,
stdin=self._subprocess.PIPE)
try:
proc.communicate(json.dumps(obj, indent=4).encode('utf-8'),
self._timeout)
except self._subprocess.TimeoutExpired:
proc.kill()
proc.communicate()
class BaseChannel(ABC):
@abstractmethod
def get_latest_driver(self):
pass
class GFEClientChannel(BaseChannel):
def __init__(self, name, **kwargs):
self.name = name
self._kwargs = kwargs
gfe_get_driver = importlib.import_module('gfe_get_driver')
self._get_latest_driver = gfe_get_driver.get_latest_geforce_driver
def get_latest_driver(self):
return self._get_latest_driver(**self._kwargs)
def parse_args():
parser = argparse.ArgumentParser(
description="Watches for GeForce experience driver updates for "
"configured systems",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("-c", "--config",
default="/etc/nv-driver-locator.json",
help="config file location")
args = parser.parse_args()
return args
class DriverLocator:
_ret_code = 0
def __init__(self, conf):
self._logger = logging.getLogger(self.__class__.__name__)
self._channels = self._construct_channels(conf['channels'])
self._db = self._construct_db(conf['db'])
self._hasher = Hasher(conf['key_components'])
self._notifiers = self._construct_notifiers(conf['notifiers'])
def _construct_channels(self, channels_config):
channel_types = {
'gfe_client': GFEClientChannel,
}
channels = []
for ch in channels_config:
try:
ctor = channel_types[ch['type']]
C = ctor(ch['name'], **ch['params'])
except Exception as e:
self._perror("Channel construction failed with exception: %s. "
"Skipping..." % (str(e),))
else:
channels.append(C)
return channels
def _construct_db(self, db_config):
db_types = {
'file': FileDB,
}
ctor = db_types[db_config['type']]
db = ctor(**db_config['params'])
return db
def _construct_notifiers(self, notifiers_config):
notifier_types = {
'email': EmailNotifier,
'command': CommandNotifier,
}
notifiers = []
for nc in notifiers_config:
try:
ctor = notifier_types[nc['type']]
N = ctor(nc['name'], **nc['params'])
except Exception as e:
self._perror("Notifier construction failed with exception: %s."
" Skipping..." % (str(e),))
else:
notifiers.append(N)
return notifiers
def _perror(self, err):
self._ret_code = 3
self._logger.error(err)
def _notify_all(self, obj):
fails = 0
for n in self._notifiers:
try:
n.notify(obj)
except Exception as e:
self._perror("Notify channel %s failed with exception: %s." %
(n.name, str(e)))
fails += 1
return fails < len(self._notifiers)
def run(self):
for ch in self._channels:
try:
drv = ch.get_latest_driver()
except Exception as e:
self._perror("get_latest_driver() invocation failed for "
"channel %s. Exception: %s. Continuing..." %
(repr(ch.name), str(e)))
continue
if drv is None:
self._perror("Driver not found for channel %s" %
(repr(ch.name),))
continue
try:
key = self._hasher.hash_object(drv)
except Exception as e:
self._perror("Key evaluation failed for channel %s. "
"Exception: %s" % (repr(name), str(e)))
continue
if not self._db.check_key(key):
if self._notify_all(drv):
self._db.set_key(key, drv)
return self._ret_code
def setup_logger(name, verbosity):
logger = logging.getLogger(name)
logger.setLevel(verbosity)
handler = logging.StreamHandler()
handler.setLevel(verbosity)
handler.setFormatter(logging.Formatter('%(asctime)s '
'%(levelname)-8s '
'%(name)s: %(message)s',
'%Y-%m-%d %H:%M:%S'))
logger.addHandler(handler)
return logger
def main():
args = parse_args()
setup_logger(DriverLocator.__name__, logging.ERROR)
with open(args.config, 'r') as conf_file:
conf = json.load(conf_file)
ret = DriverLocator(conf).run()
sys.exit(ret)
if __name__ == '__main__':
main()
Loading…
Cancel
Save