forked from colonelpanic/dotfiles
		
	Add wifi-auto-switch.
This commit is contained in:
		
							
								
								
									
										42
									
								
								dotfiles/lib/python/cached_property.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										42
									
								
								dotfiles/lib/python/cached_property.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,42 @@ | ||||
| import inspect | ||||
|  | ||||
|  | ||||
| class cached_property(object): | ||||
|     """Descriptor that caches the result of the first call to resolve its | ||||
|     contents. | ||||
|     """ | ||||
|  | ||||
|     def __init__(self, func): | ||||
|         self.__doc__ = getattr(func, '__doc__') | ||||
|         self.func = func | ||||
|  | ||||
|     def __get__(self, obj, cls): | ||||
|         if obj is None: | ||||
|             return self | ||||
|         value = self.func(obj) | ||||
|         setattr(obj, self.func.__name__, value) | ||||
|         return value | ||||
|  | ||||
|     def bust_self(self, obj): | ||||
|         """Remove the value that is being stored on `obj` for this | ||||
|         :class:`.cached_property` | ||||
|         object. | ||||
|  | ||||
|         :param obj: The instance on which to bust the cache. | ||||
|         """ | ||||
|         if self.func.__name__ in obj.__dict__: | ||||
|             delattr(obj, self.func.__name__) | ||||
|  | ||||
|     @classmethod | ||||
|     def bust_caches(cls, obj, excludes=()): | ||||
|         """Bust the cache for all :class:`.cached_property` objects on `obj` | ||||
|  | ||||
|         :param obj: The instance on which to bust the caches. | ||||
|         """ | ||||
|         for name, _ in cls.get_cached_properties(obj): | ||||
|             if name in obj.__dict__ and name not in excludes: | ||||
|                 delattr(obj, name) | ||||
|  | ||||
|     @classmethod | ||||
|     def get_cached_properties(cls, obj): | ||||
|         return inspect.getmembers(type(obj), lambda x: isinstance(x, cls)) | ||||
							
								
								
									
										11
									
								
								dotfiles/lib/python/log_util.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								dotfiles/lib/python/log_util.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,11 @@ | ||||
| import logging | ||||
|  | ||||
| from coloredlogs import ColoredStreamHandler | ||||
|  | ||||
|  | ||||
| def enable_logger(log_name, level=logging.DEBUG): | ||||
|     log = logging.getLogger(log_name) | ||||
|     handler = ColoredStreamHandler(severity_to_style={'WARNING': dict(color='red')}) | ||||
|     handler.setLevel(level) | ||||
|     log.setLevel(level) | ||||
|     log.addHandler(handler) | ||||
							
								
								
									
										182
									
								
								dotfiles/lib/python/wifi_auto_switch.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										182
									
								
								dotfiles/lib/python/wifi_auto_switch.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,182 @@ | ||||
| import argparse | ||||
| import logging | ||||
| import re | ||||
| import subprocess | ||||
| import time | ||||
|  | ||||
| from lxml import etree | ||||
|  | ||||
| from xpath import dxpb, xpb | ||||
| import log_util | ||||
|  | ||||
|  | ||||
| log = logging.getLogger(__name__) | ||||
|  | ||||
|  | ||||
| def get_stdout_from_command(command): | ||||
|     return subprocess.Popen(command, stdout=subprocess.PIPE).stdout.read() | ||||
|  | ||||
|  | ||||
| def below_threshold_trigger(threshold): | ||||
|     return lambda status_info: int(status_info['RSSI']) < threshold | ||||
|  | ||||
|  | ||||
| class Network(object): | ||||
|  | ||||
|     def __init__(self, ssid, password, | ||||
|                  should_switch=below_threshold_trigger(-68)): | ||||
|         self.ssid = ssid | ||||
|         self.password = password | ||||
|         self.should_switch = should_switch | ||||
|  | ||||
|     @property | ||||
|     def login_command(self): | ||||
|         return ["networksetup", "-setairportnetwork", "en0", | ||||
|                 self.ssid, self.password] | ||||
|  | ||||
|     def login(self): | ||||
|         log.debug("Reponse from connect: {0}".format( | ||||
|             get_stdout_from_command(self.login_command) | ||||
|         )) | ||||
|  | ||||
|  | ||||
| class OSXXMLStatusRetriever(object): | ||||
|  | ||||
|     def _get_status_xml(self): | ||||
|         return get_stdout_from_command(['airport', '-I', '--xml']) | ||||
|  | ||||
|     def _get_status_tree(self): | ||||
|         return etree.fromstring(self._get_status_xml()) | ||||
|  | ||||
|     _signal_strength_key_xpb = xpb.dict.key.text_contains_("RSSI_CTL_LIST") | ||||
|  | ||||
|     def get_status_dict(self): | ||||
|         status_tree = self._get_status_tree() | ||||
|         signal_strength_array = self._signal_strength_key_xpb.one_(status_tree).getnext() | ||||
|         signal_strengths = xpb.integer.text_.apply_(signal_strength_array) | ||||
|         return sum([int(ss) for ss in signal_strengths]) / len(signal_strengths) | ||||
|  | ||||
|     __call__ = get_status_dict | ||||
|  | ||||
|  | ||||
| class OSXStatusRetriever(object): | ||||
|  | ||||
|     KEY_REMAP = { | ||||
|         'agrCtlRSSI': 'RSSI', | ||||
|         'maxRate': 'max_rate', | ||||
|     } | ||||
|  | ||||
|     status_output_line_regex = re.compile("^([^\n]*?): ([^\n]*?)$") | ||||
|  | ||||
|     def _get_status_text(self): | ||||
|         return get_stdout_from_command(['airport', '-I']) | ||||
|  | ||||
|     @classmethod | ||||
|     def _remap_key(cls, key): | ||||
|         return cls.KEY_REMAP.get(key, key) | ||||
|  | ||||
|     def get_status_dict(self): | ||||
|         return {self._remap_key(match.group(1).strip()): match.group(2) | ||||
|                 for match in [self.status_output_line_regex.match(line.strip()) | ||||
|                               for line in self._get_status_text().split('\n')] | ||||
|                 if match is not None} | ||||
|  | ||||
|     __call__ = get_status_dict | ||||
|  | ||||
|  | ||||
| class OSXSSIDToRSSI(object): | ||||
|  | ||||
|     def _get_scan_xml(self): | ||||
|         return get_stdout_from_command(['airport', '--scan', '--xml']) | ||||
|  | ||||
|     def _get_scan_tree(self): | ||||
|         return etree.fromstring(self._get_scan_xml()) | ||||
|  | ||||
|     _network_xpb = dxpb.array.dict | ||||
|     _ssid_xpb = xpb.key.text_contains_("SSID_STR") | ||||
|     _rssi_xpb = xpb.key.text_contains_("RSSI") | ||||
|  | ||||
|     def _network_elements(self): | ||||
|         return self._network_xpb.apply_(self._get_scan_tree()) | ||||
|  | ||||
|     def get(self): | ||||
|         network_elements = self._network_elements() | ||||
|         ssid_to_rssi = {} | ||||
|         for network_element in network_elements: | ||||
|             ssid = self._get_ssid(network_element) | ||||
|             rssi = self._get_rssi(network_element) | ||||
|             if ssid not in ssid_to_rssi or rssi > ssid_to_rssi[ssid]: | ||||
|                 ssid_to_rssi[ssid] = rssi | ||||
|         return ssid_to_rssi | ||||
|  | ||||
|     def _get_ssid(self, network_element): | ||||
|         try: | ||||
|             return self._ssid_xpb.one_(network_element).getnext().text | ||||
|         except: | ||||
|             return None | ||||
|  | ||||
|     def _get_rssi(self, network_element): | ||||
|         try: | ||||
|             return int(self._rssi_xpb.one_(network_element).getnext().text) | ||||
|         except: | ||||
|             return 0 | ||||
|  | ||||
|     __call__ = get | ||||
|  | ||||
|  | ||||
| class WiFiAutoSwitcher(object): | ||||
|  | ||||
|     def __init__(self, networks, status_getter=OSXStatusRetriever(), | ||||
|                  ssid_to_rssi_getter=OSXSSIDToRSSI()): | ||||
|         self._networks = {network.ssid: network for network in networks} | ||||
|         self._get_status = status_getter | ||||
|         self._ssid_to_rssi = ssid_to_rssi_getter | ||||
|  | ||||
|     def switch_if_necessary(self): | ||||
|         status_dict = self._get_status() | ||||
|         log.debug(status_dict) | ||||
|         network = None | ||||
|         if 'SSID' in status_dict: | ||||
|             network = self._networks.get(status_dict['SSID']) | ||||
|             if network is None: | ||||
|                 return | ||||
|         if not network or network.should_switch(status_dict): | ||||
|             log.debug("Attempting to switch networks from {0}, ".format( | ||||
|                 network.ssid if network else "(Not conneted to network)" | ||||
|             )) | ||||
|             new_network = self.select_known_network_with_best_rssi() | ||||
|             if new_network: | ||||
|                 if network and new_network.ssid == network.ssid: | ||||
|                     log.debug("Switch triggered but connected network is still best.") | ||||
|                 else: | ||||
|                     new_network.login() | ||||
|         else: | ||||
|             log.debug("No switch deemed necessary.") | ||||
|  | ||||
|     def select_known_network_with_best_rssi(self): | ||||
|         ssid_to_rssi = self._ssid_to_rssi() | ||||
|         log.debug("Selecting best network using: {0}".format(ssid_to_rssi)) | ||||
|         network = max( | ||||
|             self._networks.values(), | ||||
|             key=lambda network: ssid_to_rssi.get(network.ssid, -1000000) | ||||
|         ) | ||||
|         if network.ssid in ssid_to_rssi: | ||||
|             log.debug("selected: {0}".format(network.ssid)) | ||||
|             return network | ||||
|         else: | ||||
|             log.debug("No matching networks were found.") | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     log_util.enable_logger(__name__) | ||||
|     parser = argparse.ArgumentParser() | ||||
|     parser.add_argument('-n', '--network', nargs='+', type=str, action='append', dest='networks') | ||||
|     network_pairs = parser.parse_args().networks | ||||
|     for network_pair in network_pairs: | ||||
|         assert len(network_pair) == 2 | ||||
|     auto_switcher = WiFiAutoSwitcher( | ||||
|         [Network(*ssid_password) for ssid_password in network_pairs] | ||||
|     ) | ||||
|     while True: | ||||
|          time.sleep(4) | ||||
|          auto_switcher.switch_if_necessary() | ||||
							
								
								
									
										172
									
								
								dotfiles/lib/python/xpath.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										172
									
								
								dotfiles/lib/python/xpath.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,172 @@ | ||||
| from cached_property import cached_property | ||||
|  | ||||
|  | ||||
| class XPathBuilder(object): | ||||
|  | ||||
|     def __init__(self, nodes=(), relative=True, direct_child=False): | ||||
|         self.nodes = tuple(nodes) | ||||
|         self.relative = relative | ||||
|         self.direct_child = direct_child | ||||
|  | ||||
|     @cached_property | ||||
|     def xpath(self): | ||||
|         return ('.' if self.relative else '') + ''.join(node.xpath | ||||
|                                                         for node in self.nodes) | ||||
|  | ||||
|     @property | ||||
|     def or_(self): | ||||
|         return self.update_final_node(self.nodes[-1].make_or) | ||||
|  | ||||
|     @property | ||||
|     def text_(self): | ||||
|         return self.update_final_node( | ||||
|             self.nodes[-1](selected_attribute=XPathNode.text) | ||||
|         ) | ||||
|  | ||||
|     def add_node(self, **kwargs): | ||||
|         if 'direct_child' not in kwargs: | ||||
|             kwargs['direct_child'] = self.direct_child | ||||
|         return type(self)(self.nodes + (XPathNode(**kwargs),), | ||||
|                           relative=self.relative) | ||||
|  | ||||
|     def __getattr__(self, attr): | ||||
|         return self.add_node(element=attr) | ||||
|  | ||||
|     def update_final_node(self, updated_final_node): | ||||
|         return type(self)(self.nodes[:-1] + (updated_final_node,), | ||||
|                           relative=self.relative, | ||||
|                           direct_child=self.direct_child) | ||||
|  | ||||
|     def __call__(self, *predicates, **attributes): | ||||
|         direct_child = attributes.pop('direct_child', None) | ||||
|         assert len(self.nodes) | ||||
|         updated_final_node = self.nodes[-1](predicates=predicates, | ||||
|                                             attributes=attributes, | ||||
|                                             direct_child=direct_child) | ||||
|         return self.update_final_node(updated_final_node) | ||||
|  | ||||
|     def attribute_contains(self, attribute, contains_string): | ||||
|         updated_final_node = self.nodes[-1].add_contains_predicates( | ||||
|             ((attribute, contains_string),) | ||||
|         ) | ||||
|         return self.update_final_node(updated_final_node) | ||||
|  | ||||
|     def with_classes(self, *classes): | ||||
|         return self.update_final_node(self.nodes[-1].with_classes(classes)) | ||||
|  | ||||
|     def select_attribute_(self, attribute, elem=None): | ||||
|         update_final_node = self.nodes[-1](selected_attribute=attribute) | ||||
|         builder = self.update_final_node(update_final_node) | ||||
|         if elem is not None: | ||||
|             return builder.apply_(elem) | ||||
|         else: | ||||
|             return builder | ||||
|  | ||||
|     def text_contains_(self, contained_text): | ||||
|         updated_final_node = self.nodes[-1].text_contains(contained_text) | ||||
|         return self.update_final_node(updated_final_node) | ||||
|  | ||||
|     with_class = with_classes | ||||
|  | ||||
|     def apply_(self, tree): | ||||
|         return tree.xpath(self.xpath) | ||||
|  | ||||
|     def one_(self, tree): | ||||
|         return self.apply_(tree)[0] | ||||
|  | ||||
|     def get_text_(self, tree): | ||||
|         return self.apply_(tree)[0].text_content() | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return '{0}("{1}")'.format(type(self).__name__, self.xpath) | ||||
|  | ||||
|  | ||||
| class XPathNode(object): | ||||
|  | ||||
|     text = object() | ||||
|  | ||||
|     @staticmethod | ||||
|     def contains_class(class_attribute, contained_class): | ||||
|         return "contains(concat(' ',normalize-space(@{0}),' '),' {1} ')".\ | ||||
|             format(class_attribute, contained_class) | ||||
|  | ||||
|     @staticmethod | ||||
|     def contains_attribute(attribute, contained_string): | ||||
|         return "contains(@{0}, '{1}')".format(attribute, contained_string) | ||||
|  | ||||
|     @staticmethod | ||||
|     def attribute_equal(attribute, value): | ||||
|         return "@{0} = '{1}'".format(attribute, value) | ||||
|  | ||||
|     def __init__(self, element='*', attributes=None, predicates=None, | ||||
|                  direct_child=False, use_or=False, selected_attribute=None): | ||||
|         self.element = element | ||||
|         self.predicates = tuple(predicates) if predicates else () | ||||
|         if attributes: | ||||
|             self.predicates += tuple([self.attribute_equal(key, value) | ||||
|                                       for key, value in attributes.items()]) | ||||
|         self.direct_child = direct_child | ||||
|         self.use_or = use_or | ||||
|         self.selected_attribute = selected_attribute | ||||
|  | ||||
|     @property | ||||
|     def make_or(self): | ||||
|         return self(use_or=True) | ||||
|  | ||||
|     @property | ||||
|     def separator(self): | ||||
|         return '/' if self.direct_child else '//' | ||||
|  | ||||
|     @property | ||||
|     def xpath(self): | ||||
|         return '{0}{1}{2}{3}'.format(self.separator, self.element, | ||||
|                                      self.predicate_string, | ||||
|                                      self.selected_attribute_string) | ||||
|  | ||||
|     @property | ||||
|     def predicate_joiner(self): | ||||
|         return ' or ' if self.use_or else ' and ' | ||||
|  | ||||
|     @property | ||||
|     def predicate_string(self): | ||||
|         if self.predicates: | ||||
|             predicate = self.predicate_joiner.join(self.predicates) | ||||
|             return '[ {0} ]'.format(predicate) | ||||
|         else: | ||||
|             return '' | ||||
|  | ||||
|     @property | ||||
|     def selected_attribute_string(self): | ||||
|         if self.selected_attribute is self.text: | ||||
|             return '/text()' | ||||
|         return '/@{0}'.format(self.selected_attribute) \ | ||||
|             if self.selected_attribute else '' | ||||
|  | ||||
|     def __call__(self, element=None, predicates=(), attributes=None, | ||||
|                  direct_child=None, use_or=False, selected_attribute=None): | ||||
|         direct_child = (self.direct_child | ||||
|                         if direct_child is None | ||||
|                         else direct_child) | ||||
|         element = self.element if element is None else element | ||||
|         new_predicates = self.predicates + tuple(predicates) | ||||
|         return type(self)(element, attributes, new_predicates, | ||||
|                           direct_child, use_or, selected_attribute) | ||||
|  | ||||
|     def with_classes(self, classes): | ||||
|         predicates = tuple(self.contains_class('class', contained_class) | ||||
|                            for contained_class in classes) | ||||
|  | ||||
|         return self(predicates=predicates) | ||||
|  | ||||
|     def add_contains_predicates(self, kv_pairs): | ||||
|         predicates = tuple(self.contains_attribute(attribute, contains_string) | ||||
|                            for attribute, contains_string in kv_pairs) | ||||
|         return self(predicates=predicates) | ||||
|  | ||||
|     def text_contains(self, contained_text): | ||||
|         return self(predicates=("contains(text(),'{0}')". | ||||
|                                 format(contained_text),)) | ||||
|  | ||||
|  | ||||
| xpb = XPathBuilder() | ||||
| dxpb = XPathBuilder(direct_child=True) | ||||
		Reference in New Issue
	
	Block a user