From 9866dfb204b8bdbb5bf341204371a4e4e458a9c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Benencia?= Date: Wed, 3 Oct 2012 19:46:06 -0300 Subject: Global refactoring --- lib/core.py | 139 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 lib/core.py (limited to 'lib/core.py') diff --git a/lib/core.py b/lib/core.py new file mode 100644 index 0000000..e7ac571 --- /dev/null +++ b/lib/core.py @@ -0,0 +1,139 @@ +import socketserver +import ipaddr +import re + +from lib.config import Config + +class Daemon(): + def __init__(self, data): + self.data = data + self.domain_regexp = re.compile("(?!-)[A-Z\d-]{1,63}(?\n'.format(adm.name, adm.surname, adm.email) + else: + result += '{0}: {1}\n'.format(k, v) + + return result + + def search_ip(self, ip): + result = {} + + # Iterate over all IP block elements + networks = self.data.get_networks() + for key in networks: + for block in networks[key].ip_blocks: + if ipaddr.IPAddress(ip) in ipaddr.IPNetwork(block): + return networks[key].as_dict(self.data) + + result['error'] = "Network not found" + return result + + def search_domain(self, domain): + # Iterate over all network and check its domains + networks = self.data.get_networks() + domains = self.data.get_domains() + for network in networks.values(): + if any(domains[d].name == domain for d in network.domains): + return network.as_dict(self.data) + + return {'error':'Domain not found'} + + # TODO + + def search_person(self, query): + pass + + def is_ip(self, query): + try: + ipaddr.IPAddress(query) + return True + except ValueError: + return False + + def is_domain(self, hostname): + if len(hostname) > 255: + return False + + if hostname[-1:] == ".": + hostname = hostname[:-1] # strip exactly one dot from the right, if present + + return all(self.domain_regexp.match(x) for x in hostname.split(".")) + + # TODO + def get_help(self): + return "This will be the help" + + def get_footer(self): + if not self.footer: + f = open(Config().parser['Printing']['footer']) + self.footer = f.read() + f.close() + + return self.footer + + def get_header(self): + if not self.header: + f = open(Config().parser['Printing']['header']) + self.header = f.read() + f.close() + + return self.header + +class WhoisHandler(socketserver.BaseRequestHandler): + + def setup(self): + self.daemon = self.server.daemon + + def handle(self): + data = str(self.request.recv(100).strip(), 'utf-8') + print('Received: {}'.format(data)) + + response = self.daemon.get_header() + response += self.daemon.query(data) + response += self.daemon.get_footer() + + self.request.sendall(bytes(response, 'utf-8')) + +class ClassicServer(socketserver.ThreadingTCPServer): + def __init__(self, daemon): + host = Config().parser['Servers']['classic_host'] + port = int(Config().parser['Servers']['classic_port']) + self.daemon = daemon + + socketserver.ThreadingTCPServer.__init__(self, (host, port), WhoisHandler) + +class WebServer(socketserver.ThreadingTCPServer): + def __init__(self, daemon): + self.host = Config().parser['Servers']['web_host'] + self.port = int(Config().parser['Servers']['web_port']) + self.daemon = daemon + + socketserver.ThreadingTCPServer.__init__(self, (self.host, self.port), WhoisHandler) -- cgit v1.2.3