This repository has been archived on 2020-03-24. You can view files and clone it, but cannot push or open issues or pull requests.
quay/util/ipresolver/__init__.py

124 lines
4 KiB
Python

import logging
import time
from cachetools import lru_cache
from collections import namedtuple, defaultdict
from netaddr import IPNetwork, IPAddress, IPSet, AddrFormatError
from threading import Thread
import geoip2.database
import geoip2.errors
_AWS_IP_RANGES_URL = 'https://ip-ranges.amazonaws.com/ip-ranges.json'
_UPDATE_TIME = 60 * 60 * 24
_RETRY_TIME = 60 * 60 * 5
ResolvedLocation = namedtuple('ResolvedLocation', ['provider', 'region', 'service', 'sync_token'])
logger = logging.getLogger(__name__)
class IPResolver(Thread):
def __init__(self, app, client=None, *args, **kwargs):
super(IPResolver, self).__init__(*args, **kwargs)
self.daemon = True
self.app = app
self.client = client or app.config['HTTPCLIENT']
self.location_function = None
self.sync_token = None
self.geoip_db = geoip2.database.Reader('util/ipresolver/GeoLite2-Country.mmdb')
def resolve_ip(self, ip_address):
""" Attempts to return resolved information about the specified IP Address. If such an attempt fails,
returns None.
"""
location_function = self.location_function
if not ip_address or not location_function:
return None
return location_function(ip_address)
def _update_aws_ip_range(self):
logger.debug('Starting download of AWS IP Range table from %s', _AWS_IP_RANGES_URL)
try:
response = self.client.get(_AWS_IP_RANGES_URL)
if response.status_code / 100 != 2:
logger.error('Non-200 response (%s) for AWS IP Range table request', response.status_code)
return False
except:
logger.exception('Could not download AWS IP range table')
return False
# Check if the sync token is the same. If so, no updates are necessary.
if self.sync_token and response.json()['syncToken'] == self.sync_token:
logger.debug('No updates necessary')
return True
# Otherwise, update the range lookup function.
all_amazon, regions, services = IPResolver._parse_amazon_ranges(response.json())
self.sync_token = response.json()['syncToken']
self.location_function = IPResolver._build_location_function(self.sync_token, all_amazon, regions, services, self.geoip_db)
logger.debug('Successfully updated AWS IP range table with sync token: %s', self.sync_token)
return True
@staticmethod
def _build_location_function(sync_token, all_amazon, regions, country, country_db):
@lru_cache(maxsize=4096)
def _get_location(ip_address):
try:
parsed_ip = IPAddress(ip_address)
except AddrFormatError:
return ResolvedLocation('invalid_ip', None, None, sync_token)
if parsed_ip not in all_amazon:
# Try geoip classification
try:
found = country_db.country(parsed_ip)
return ResolvedLocation(
'internet',
found.continent.code,
found.country.iso_code,
sync_token,
)
except geoip2.errors.AddressNotFoundError:
return ResolvedLocation('internet', None, None, sync_token)
region = None
for region_name, region_set in regions.items():
if parsed_ip in region_set:
region = region_name
break
return ResolvedLocation('aws', region, None, sync_token)
return _get_location
@staticmethod
def _parse_amazon_ranges(ranges):
all_amazon = IPSet()
regions = defaultdict(IPSet)
services = defaultdict(IPSet)
for service_description in ranges['prefixes']:
cidr = IPNetwork(service_description['ip_prefix'])
service = service_description['service']
region = service_description['region']
all_amazon.add(cidr)
regions[region].add(cidr)
services[service].add(cidr)
return all_amazon, regions, services
def run(self):
while True:
logger.debug('Updating AWS IP database')
if not self._update_aws_ip_range():
logger.debug('Failed; sleeping for %s seconds', _RETRY_TIME)
time.sleep(_RETRY_TIME)
continue
logger.debug('Success; sleeping for %s seconds', _UPDATE_TIME)
time.sleep(_UPDATE_TIME)