0
0
.dotfiles/powerline-bin/powerline/lib/config.py

219 lines
6.1 KiB
Python
Raw Permalink Normal View History

2024-09-07 11:49:53 -04:00
# vim:fileencoding=utf-8:noet
from __future__ import (unicode_literals, division, absolute_import, print_function)
import json
import codecs
from copy import deepcopy
from threading import Event, Lock
from collections import defaultdict
from powerline.lib.threaded import MultiRunnedThread
from powerline.lib.watcher import create_file_watcher
def open_file(path):
return codecs.open(path, encoding='utf-8')
def load_json_config(config_file_path, load=json.load, open_file=open_file):
with open_file(config_file_path) as config_file_fp:
return load(config_file_fp)
class DummyWatcher(object):
def __call__(self, *args, **kwargs):
return False
def watch(self, *args, **kwargs):
pass
class DeferredWatcher(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.calls = []
def __call__(self, *args, **kwargs):
self.calls.append(('__call__', args, kwargs))
def watch(self, *args, **kwargs):
self.calls.append(('watch', args, kwargs))
def unwatch(self, *args, **kwargs):
self.calls.append(('unwatch', args, kwargs))
def transfer_calls(self, watcher):
for attr, args, kwargs in self.calls:
getattr(watcher, attr)(*args, **kwargs)
class ConfigLoader(MultiRunnedThread):
def __init__(self, shutdown_event=None, watcher=None, watcher_type=None, load=load_json_config, run_once=False):
super(ConfigLoader, self).__init__()
self.shutdown_event = shutdown_event or Event()
if run_once:
self.watcher = DummyWatcher()
self.watcher_type = 'dummy'
else:
self.watcher = watcher or DeferredWatcher()
if watcher:
if not watcher_type:
raise ValueError('When specifying watcher you must also specify watcher type')
self.watcher_type = watcher_type
else:
self.watcher_type = 'deferred'
self._load = load
self.pl = None
self.interval = None
self.lock = Lock()
self.watched = defaultdict(set)
self.missing = defaultdict(set)
self.loaded = {}
def set_watcher(self, watcher_type, force=False):
if watcher_type == self.watcher_type:
return
watcher = create_file_watcher(self.pl, watcher_type)
with self.lock:
if self.watcher_type == 'deferred':
self.watcher.transfer_calls(watcher)
self.watcher = watcher
self.watcher_type = watcher_type
def set_pl(self, pl):
self.pl = pl
def set_interval(self, interval):
self.interval = interval
def register(self, function, path):
'''Register function that will be run when file changes.
:param function function:
Function that will be called when file at the given path changes.
:param str path:
Path that will be watched for.
'''
with self.lock:
self.watched[path].add(function)
self.watcher.watch(path)
def register_missing(self, condition_function, function, key):
'''Register any function that will be called with given key each
interval seconds (interval is defined at __init__). Its result is then
passed to ``function``, but only if the result is true.
:param function condition_function:
Function which will be called each ``interval`` seconds. All
exceptions from it will be logged and ignored. IOError exception
will be ignored without logging.
:param function function:
Function which will be called if condition_function returns
something that is true. Accepts result of condition_function as an
argument.
:param str key:
Any value, it will be passed to condition_function on each call.
Note: registered functions will be automatically removed if
condition_function results in something true.
'''
with self.lock:
self.missing[key].add((condition_function, function))
def unregister_functions(self, removed_functions):
'''Unregister files handled by these functions.
:param set removed_functions:
Set of functions previously passed to ``.register()`` method.
'''
with self.lock:
for path, functions in list(self.watched.items()):
functions -= removed_functions
if not functions:
self.watched.pop(path)
self.loaded.pop(path, None)
def unregister_missing(self, removed_functions):
'''Unregister files handled by these functions.
:param set removed_functions:
Set of pairs (2-tuples) representing ``(condition_function,
function)`` function pairs previously passed as an arguments to
``.register_missing()`` method.
'''
with self.lock:
for key, functions in list(self.missing.items()):
functions -= removed_functions
if not functions:
self.missing.pop(key)
def load(self, path):
try:
# No locks: GIL does what we need
return deepcopy(self.loaded[path])
except KeyError:
r = self._load(path)
self.loaded[path] = deepcopy(r)
return r
def update(self):
toload = []
with self.lock:
for path, functions in self.watched.items():
for function in functions:
try:
modified = self.watcher(path)
except OSError as e:
modified = True
self.exception('Error while running watcher for path {0}: {1}', path, str(e))
else:
if modified:
toload.append(path)
if modified:
function(path)
with self.lock:
for key, functions in list(self.missing.items()):
for condition_function, function in list(functions):
try:
path = condition_function(key)
except IOError:
pass
except Exception as e:
self.exception('Error while running condition function for key {0}: {1}', key, str(e))
else:
if path:
toload.append(path)
function(path)
functions.remove((condition_function, function))
if not functions:
self.missing.pop(key)
for path in toload:
try:
self.loaded[path] = deepcopy(self._load(path))
except Exception as e:
self.exception('Error while loading {0}: {1}', path, str(e))
try:
self.loaded.pop(path)
except KeyError:
pass
try:
self.loaded.pop(path)
except KeyError:
pass
def run(self):
while self.interval is not None and not self.shutdown_event.is_set():
self.update()
self.shutdown_event.wait(self.interval)
def exception(self, msg, *args, **kwargs):
if self.pl:
self.pl.exception(msg, prefix='config_loader', *args, **kwargs)
else:
raise