Source code for digital_comms.mobile_network.model

"""Cambridge Communications Assessment Model
"""
from collections import defaultdict
from itertools import tee
from pprint import pprint

[docs]class NetworkManager(object): """ Model controller class. Represents postcode sectors nested in local area districts, with all affiliated assets, capacities and clutter types. Parameters ---------- lads: :obj:`list` of :obj:`dict` List of local area districts * id: :obj:`int` Unique ID * name: :obj:`str` Name of the LAD pcd_sectors: :obj:`list` of :obj:`dict` List of postcode sectors (pcd) * id: :obj:`str` Postcode name * lad_id: :obj:`int` Unique ID * population: :obj:`int` Number of inhabitants * area: :obj:`float` Areas size in square kilometers (km^2) * user_throughput: :obj:`int` Per user monthly data demand in gigabytes (GB) assets: :obj:`list` of :obj:`dict` List of assets * pcd_sector: :obj:`str` Code of the postcode sector * site_ngr: :obj:`int` Unique site reference number * technology: :obj:`str` Abbreviation of the asset technology (LTE, 5G etc.) * frequency: :obj:`str` Spectral frequency(s) the asset operates at (800, 2600, ..) * bandwidth: :obj:`str` Downlink bandwith of the asset (10MHz, ..) * build_date: :obj:`int` Build year of the asset capacity_lookup_table: dict Dictionary that represents the clutter/asset type, spectrum frequency and channel bandwidth, and the consequential cellular capacity provided for different asset densities. * key: :obj:`tuple` * 0: :obj:`str` Area type ('urban', 'suburban' or 'rural') or asset type ('small_cells') * 1: :obj:`str` Frequency of the asset configuration (800, 2600, ..) * 2: :obj:`str` Bandwith of the asset configuration (10, 40, ..) * value: :obj:`list` of :obj:`tuple` * 0: :obj:`int` Cellular asset density per square kilometer (sites per km^2) * 1: :obj:`int` Average Radio Access Network capacity in Mbps per square kilometer (Mbps/km^2) clutter_lookup: list of tuples Each element represents the settlement definitions for urban, suburban and rural by population density in square kilometers (persons per km^2) * 0: :obj:`int` Population density in persons per km^2. * 1: :obj:`string` Settlement type (rban, suburban and rural) simulation_parameters: dict Contains all simulation parameters, set in the run script. * market_share: :obj: 'int' Percentage market share of the modelled hypothetical operator. * annual_budget: :obj: 'int' Annual budget to spend. * service_obligation_capacity: :obj: 'int' Required service obligation. * busy_hour_traffic_percentage: :obj: 'int' Percentage of daily traffic taking place in the busy hour. * coverage_threshold: :obj: 'int' The threshold we wish to measure the served population against. * penetration: :obj: 'int' The penetration of users with smartphone and data access. """ def __init__(self, lads, pcd_sectors, assets, capacity_lookup_table, clutter_lookup, simulation_parameters): self.lads = {} self.postcode_sectors = {} for lad_data in lads: lad_id = lad_data["id"] self.lads[lad_id] = LAD(lad_data, simulation_parameters) assets_by_pcd = defaultdict(list) for asset in assets: assets_by_pcd[asset['pcd_sector']].append(asset) for pcd_sector_data in pcd_sectors: try: lad_id = pcd_sector_data["lad_id"] pcd_sector_id = pcd_sector_data["id"] assets = assets_by_pcd[pcd_sector_id] pcd_sector = PostcodeSector(pcd_sector_data, assets, capacity_lookup_table, clutter_lookup, simulation_parameters, 0) self.postcode_sectors[pcd_sector_id] = pcd_sector lad_containing_pcd_sector = self.lads[lad_id] lad_containing_pcd_sector.add_pcd_sector(pcd_sector) except: print('could not create object for {}'.format(pcd_sector_data["id"])) print(pcd_sector_data) pass
[docs]class LAD(object): """ Local area district. Represents an area to be modelled. Contains data for demand characterisation and assets for supply assessment. Arguments --------- data: dict Metadata and info for the LAD * id: :obj:`int` Unique ID * name: :obj:`str` Name of the LAD simulation_parameters: dict Contains all simulation parameters, set in the run script. * market_share: :obj: 'int' Percentage market share of the modelled hypothetical operator. * annual_budget: :obj: 'int' Annual budget to spend. * service_obligation_capacity: :obj: 'int' Required service obligation. * busy_hour_traffic_percentage: :obj: 'int' Percentage of daily traffic taking place in the busy hour. * coverage_threshold: :obj: 'int' The threshold we wish to measure the served population against. * penetration: :obj: 'int' The penetration of users with smartphone and data access. """ def __init__(self, data, simulation_parameters): self.id = data["id"] self.name = data["name"] self._pcd_sectors = {} def __repr__(self): return "<LAD id:{} name:{}>".format(self.id, self.name) @property def population(self): return sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values()]) @property def area(self): return sum([ pcd_sector.area for pcd_sector in self._pcd_sectors.values()]) @property def population_density(self): total_area = sum([ pcd_sector.area for pcd_sector in self._pcd_sectors.values()]) if total_area == 0: return 0 else: return self.population / total_area
[docs] def add_pcd_sector(self, pcd_sector): self._pcd_sectors[pcd_sector.id] = pcd_sector
[docs] def capacity(self): """Return the mean capacity from all nested postcode sectors """ if not self._pcd_sectors: return 0 summed_capacity = sum([ pcd_sector.capacity for pcd_sector in self._pcd_sectors.values()]) return summed_capacity / len(self._pcd_sectors)
[docs] def demand(self): """Return the mean capacity demand from all nested postcode sectors """ if not self._pcd_sectors: return 0 summed_demand = sum( pcd_sector.demand * pcd_sector.area for pcd_sector in self._pcd_sectors.values() ) summed_area = sum( pcd_sector.area for pcd_sector in self._pcd_sectors.values() ) return summed_demand / summed_area
[docs] def coverage(self, simulation_parameters): """Return proportion of population with capacity coverage over a threshold """ if not self._pcd_sectors: return 0 threshold = simulation_parameters['coverage_threshold'] population_with_coverage = sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values() if pcd_sector.capacity >= threshold]) total_pop = sum([ pcd_sector.population for pcd_sector in self._pcd_sectors.values()]) return float(population_with_coverage) / total_pop
[docs]class PostcodeSector(object): """Represents a pcd_sector to be modelled """ def __init__(self, data, assets, capacity_lookup_table, clutter_lookup, simulation_parameters, testing): self.id = data["id"] self.lad_id = data["lad_id"] self.population = data["population"] self.area = data["area_km2"] self.user_throughput = data["user_throughput"] self.penetration = simulation_parameters['penetration'] self.busy_hour_traffic = simulation_parameters['busy_hour_traffic_percentage'] self.market_share = simulation_parameters['market_share'] self.user_demand = self._calculate_user_demand( self.user_throughput, simulation_parameters) self.demand_density = self.demand / self.area self._capacity_lookup_table = capacity_lookup_table self._clutter_lookup = clutter_lookup self.clutter_environment = lookup_clutter_geotype( self._clutter_lookup, self.population_density ) self.assets = assets self.site_density_macrocells = self._calculate_site_density_macrocells() self.site_density_small_cells = self._calculate_site_density_small_cells() self.capacity = ( self._macrocell_site_capacity(simulation_parameters, testing) + self.small_cell_capacity(simulation_parameters, testing) ) def __repr__(self): return "<PostcodeSector id:{}>".format(self.id) def _calculate_site_density_macrocells(self): unique_sites = set() for asset in self.assets: if asset['type'] == 'macrocell_site': unique_sites.add(asset['site_ngr']) site_density = float(len(unique_sites)) / self.area return site_density def _calculate_site_density_small_cells(self): small_cells = [] for asset in self.assets: if asset['type'] == 'small_cell': small_cells.append(asset) site_density = float(len(small_cells)) / self.area return site_density def _calculate_user_demand(self, user_throughput, simulation_parameters): """Calculate Mb/second from GB/month supplied as throughput scenario E.g. 2 GB per month * 1024 to find MB * 8 to covert bytes to bits * busy_hour_traffic = daily traffic taking place in the busy hour * 1/30 assuming 30 days per month * 1/3600 converting hours to seconds, = ~0.01 Mbps required per user """ busy_hour_traffic = simulation_parameters['busy_hour_traffic_percentage'] / 100 demand = user_throughput * 1024 * 8 * busy_hour_traffic / 30 / 3600 return demand @property def demand(self): """ Estimate total demand based on population and penetration. E.g. 0.02 Mbps per user during busy hours * 100 population * 0.8 penetration / 10 km^2 area = ~0.16 Mbps/km^2 area capacity demand """ users = self.population * (self.penetration / 100) * self.market_share user_throughput = users * self.user_demand capacity_per_kmsq = user_throughput / self.area return capacity_per_kmsq @property def population_density(self): """ Calculate population density for a specific population and area. """ return self.population / self.area def _macrocell_site_capacity(self, simulation_parameters, testing): """ Find the macrocellular Radio Access Network capacity given the area assets and deployed frequency bands. """ capacity = 0 for frequency in ['700', '800', '1800', '2600', '3500', '26000']: unique_sites = set() for asset in self.assets: for asset_frequency in asset['frequency']: if asset_frequency == frequency: unique_sites.add(asset['site_ngr']) site_density = float(len(unique_sites)) / self.area bandwidth = find_frequency_bandwidth(frequency, simulation_parameters) if frequency == '700' or frequency == '3500' or frequency == '26000': generation = '5G' else: generation = '4G' tech_capacity = lookup_capacity( self._capacity_lookup_table, self.clutter_environment, frequency, bandwidth, generation, site_density, 0) capacity += tech_capacity return capacity
[docs] def small_cell_capacity(self, simulation_parameters, testing): """ Find the small cell Radio Access Network capacity given the area assets and deployed frequency bands. """ num_small_cells = len([ asset for asset in self.assets if asset['type'] == "small_cell" ]) site_density = float(num_small_cells) / self.area capacity = lookup_capacity( self._capacity_lookup_table, "small_cells", "3700", "25", "5G", site_density, testing) return capacity
[docs]def find_frequency_bandwidth(frequency, simulation_parameters): """ Finds the correct bandwidth for a specific frequency from the simulation parameters. """ simulation_parameter = 'channel_bandwidth_{}'.format(frequency) if simulation_parameter not in simulation_parameters.keys(): KeyError('{} not specified in simulation_parameters'.format(frequency)) bandwidth = simulation_parameters[simulation_parameter] return bandwidth
[docs]def pairwise(iterable): """Return iterable of 2-tuples in a sliding window >>> list(pairwise([1,2,3,4])) [(1,2),(2,3),(3,4)] """ a, b = tee(iterable) next(b, None) return zip(a, b)
[docs]def lookup_clutter_geotype(clutter_lookup, population_density): """Return geotype based on population density Params: ====== clutter_lookup : list of (population_density_upper_bound, geotype) tuples sorted by population_density_upper_bound ascending """ highest_popd, highest_geotype = clutter_lookup[2] middle_popd, middle_geotype = clutter_lookup[1] lowest_popd, lowest_geotype = clutter_lookup[0] if population_density < middle_popd: return lowest_geotype elif population_density > highest_popd: return highest_geotype else: return middle_geotype
[docs]def lookup_capacity(lookup_table, clutter_environment, frequency, bandwidth, generation, site_density, testing): """ Use lookup table to find capacity by clutter environment geotype, frequency, bandwidth and site density. """ if (clutter_environment, frequency, bandwidth, generation) not in lookup_table: raise KeyError("Combination %s not found in lookup table", (clutter_environment, frequency, bandwidth, generation)) density_capacities = lookup_table[(clutter_environment, frequency, bandwidth, generation)] lowest_density, lowest_capacity = density_capacities[0] if site_density < lowest_density: return 0 for a, b in pairwise(density_capacities): lower_density, lower_capacity = a upper_density, upper_capacity = b if lower_density <= site_density and site_density < upper_density: return interpolate(lower_density, lower_capacity, upper_density, upper_capacity, site_density) # If not caught between bounds return highest capacity highest_density, highest_capacity = density_capacities[-1] return highest_capacity
[docs]def interpolate(x0, y0, x1, y1, x): """ Linear interpolation between two values. """ y = (y0 * (x1 - x) + y1 * (x - x0)) / (x1 - x0) return y