Source code for mapof.elections.objects.Election

import copy
import csv
import itertools
import logging
import math
import os
from abc import abstractmethod

import numpy as np
from mapof.core.distances import l2
from mapof.core.objects.Instance import Instance
from sklearn.decomposition import PCA
from sklearn.manifold import MDS

import mapof.elections.persistence.election_exports as exports
import mapof.elections.persistence.election_imports as imports
from mapof.elections.features import get_local_feature
from mapof.elections.other.approval_rules import compute_abcvoting_rule_for_single_election
from mapof.elections.other.glossary import is_pseudo_culture
from mapof.elections.other.ordinal_rules import (
    compute_sntv_voting_rule,
    compute_borda_voting_rule,
    compute_stv_voting_rule
)

OBJECT_TYPES = ['vote', 'candidate']


[docs] class Election(Instance): """ (Abstract) Election class. """ def __init__(self, experiment_id=None, election_id=None, culture_id=None, votes=None, instance_type: str = None, num_voters: int = None, num_candidates: int = None, label=None, fast_import=False, is_shifted=False, is_imported=False, is_exported=True, params=None, **kwargs): super().__init__(experiment_id=experiment_id, instance_id=election_id, culture_id=culture_id, params=params, **kwargs) self.instance_type = instance_type self.format = get_format_from_instance_type(instance_type) self.election_id = election_id self.label = label self.num_voters = num_voters self.num_candidates = num_candidates self.votes = votes self.is_exported = is_exported self.winners = None self.alternative_winners = {} self.is_pseudo = is_pseudo_culture(culture_id) self.potes = None self.features = {} self.object_type = 'vote' self.points = {} self.is_shifted = is_shifted self.is_imported = is_imported self.fast_import = fast_import self.winning_committee = {} self.distances = {} self.import_distances() self.coordinates = {} self.import_coordinates()
[docs] def import_distances(self) -> None: """ Imports distances from a .csv file. Returns ------- None """ if not self.fast_import: for object_type in OBJECT_TYPES: try: self.distances[object_type] = \ imports.import_distances(self.experiment_id, self.election_id, object_type) except Exception: pass
[docs] def import_coordinates(self) -> None: """ Imports coordinates from a .csv file. Returns ------- None """ for object_type in OBJECT_TYPES: try: self.coordinates[object_type] = \ imports.import_coordinates(self.experiment_id, self.election_id, object_type) except Exception: pass
def get_distances(self, object_type): try: return self.distances[object_type] except Exception: self.distances[object_type] = \ imports.import_distances(self.experiment_id, self.election_id, object_type) return self.distances[object_type] def get_coordiantes(self, object_type): try: return self.coordinates[object_type] except Exception: self.coordinates[object_type] = \ imports.import_coordinates(self.experiment_id, self.election_id, object_type) return self.coordinates[object_type] def set_default_object_type(self, object_type): self.object_type = object_type def import_matrix(self) -> np.ndarray: file_name = f'{self.election_id}.csv' path = os.path.join(os.getcwd(), "experiments", self.experiment_id, 'matrices', file_name) matrix = np.zeros([self.num_candidates, self.num_candidates]) with open(path, 'r', newline='') as csv_file: reader = csv.DictReader(csv_file, delimiter=';') for i, row in enumerate(reader): for j, candidate_id in enumerate(row): matrix[i][j] = row[candidate_id] return matrix
[docs] def compute_potes(self, mapping=None): """ Convert votes to positional votes (called potes) """ if not self.is_pseudo: if mapping is None: self.potes = np.array([[list(vote).index(i) for i, _ in enumerate(vote)] for vote in self.votes]) else: self.potes = np.array([[list(vote).index(mapping[i]) for i, _ in enumerate(vote)] for vote in self.votes]) return self.potes
def vector_to_interval(self, vector, precision=None) -> list: # discreet version for now interval = [] w = int(precision / self.num_candidates) for i in range(self.num_candidates): for j in range(w): interval.append(vector[i] / w) return interval def compute_alternative_winners(self, method=None, party_id=None, committee_size=None): election_without_party_id = _remove_candidate_from_election(copy.deepcopy(self), party_id, committee_size) election_without_party_id = map_the_votes(election_without_party_id, party_id, committee_size) if method == 'sntv': winners_without_party_id = compute_sntv_voting_rule( election=election_without_party_id, committee_size=committee_size) elif method == 'borda': winners_without_party_id = compute_borda_voting_rule( election=election_without_party_id, committee_size=committee_size) elif method == 'stv': winners_without_party_id = compute_stv_voting_rule( election=election_without_party_id, committee_size=committee_size) else: winners_without_party_id = [] self.alternative_winners[party_id] = _unmap_the_winners(winners_without_party_id, party_id, committee_size) @abstractmethod def compute_distances(self): pass def embed(self, algorithm='mds', object_type=None): if object_type is None: object_type = self.object_type if algorithm.lower() == 'pca': pca = PCA(n_components=2) self.coordinates[object_type] = pca.fit_transform(self.distances[object_type]) elif algorithm.lower() == 'mds': MDS_object = MDS(n_components=2, dissimilarity='precomputed', normalized_stress='auto') self.coordinates[object_type] = MDS_object.fit_transform(self.distances[object_type]) else: logging.warning('No such algorithm!') if not self.all_dist_zeros(object_type): dist = np.zeros( [len(self.coordinates[object_type]), len(self.coordinates[object_type])]) for pos_1, pos_2 in itertools.combinations( [i for i in range(len(self.coordinates[object_type]))], 2): dist[pos_1][pos_2] = l2(self.coordinates[object_type][pos_1], self.coordinates[object_type][pos_2]) result = np.where(dist == np.amax(dist)) id_1 = result[0][0] id_2 = result[1][0] # rotate a = id_1 b = id_2 try: d_x = self.coordinates[object_type][a][0] - self.coordinates[object_type][b][0] d_y = self.coordinates[object_type][a][1] - self.coordinates[object_type][b][1] alpha = math.atan(d_x / d_y) self.rotate(alpha - math.pi / 2., object_type) self.rotate(math.pi / 4., object_type) except Exception: pass # PUT heavier corner in the left lower part if self.coordinates[object_type][a][0] < self.coordinates[object_type][b][0]: left = a right = b else: left = b right = a try: left_ctr = 0 right_ctr = 0 for v in range(len(self.coordinates[object_type])): d_left = l2(self.coordinates[object_type][left], self.coordinates[object_type][v]) d_right = l2(self.coordinates[object_type][right], self.coordinates[object_type][v]) if d_left < d_right: left_ctr += 1 else: right_ctr += 1 if left_ctr < right_ctr: self.rotate(math.pi, object_type) except Exception: pass if self.is_exported and self.experiment_id is not None: exports.export_coordinates(self, object_type=object_type) def all_dist_zeros(self, object_type): if np.abs(self.distances[object_type]).sum(): return False else: return True
[docs] @staticmethod def rotate_point(cx, cy, angle, px, py) -> (float, float): """ Rotate two-dimensional point by an angle """ s, c = math.sin(angle), math.cos(angle) px -= cx py -= cy x_new, y_new = px * c - py * s, px * s + py * c px, py = x_new + cx, y_new + cy return px, py
[docs] def rotate(self, angle, object_type) -> None: """ Rotate all the points by a given angle """ for instance_id in range(len(self.coordinates[object_type])): self.coordinates[object_type][instance_id][0], \ self.coordinates[object_type][instance_id][1] = \ self.rotate_point(0.5, 0.5, angle, self.coordinates[object_type][instance_id][0], self.coordinates[object_type][instance_id][1])
def compute_feature(self, feature_id, feature_long_id=None, **kwargs): if feature_long_id is None: feature_long_id = feature_id feature = get_local_feature(feature_id) self.features[feature_long_id] = feature(self, **kwargs) def compute_rule(self, rule_id, **kwargs): compute_abcvoting_rule_for_single_election(self, rule_id, **kwargs) def get_feature(self, feature_id, feature_long_id=None, overwrite=False, compute_if_missing=True, **kwargs): if overwrite and not compute_if_missing: raise ValueError('Cannot overwrite without computing the feature.') if feature_long_id is None: feature_long_id = feature_id if not compute_if_missing: return self.features[feature_long_id] if feature_id not in self.features or overwrite: self.compute_feature(feature_id, feature_long_id, **kwargs) return self.features[feature_long_id] def export_to_file(self, path_to_folder, is_aggregated=False): return exports.export_election_without_experiment(self, path_to_folder, is_aggregated)
def map_the_votes(election, party_id, party_size) -> Election: new_votes = [[] for _ in range(election.num_voters)] for i in range(election.num_voters): for j in range(election.num_candidates): if election.votes[i][j] >= party_id * party_size: new_votes[i].append(election.votes[i][j] - party_size) else: new_votes[i].append(election.votes[i][j]) election.votes = new_votes return election def _unmap_the_winners(winners, party_id, party_size): new_winners = [] for j in range(len(winners)): if winners[j] >= party_id * party_size: new_winners.append(winners[j] + party_size) else: new_winners.append(winners[j]) return new_winners def _remove_candidate_from_election(election, party_id, party_size) -> Election: for vote in election.votes: for i in range(party_size): _id = party_id * party_size + i vote.remove(_id) election.num_candidates -= party_size return election def get_format_from_instance_type(instance_type): if instance_type == 'approval': return 'app' elif instance_type == 'ordinal': return 'soc'