Source code for mapof.elections.objects.OrdinalElection

import csv
import logging
import os
from collections import Counter

import numpy as np
from mapof.core.distances import swap_distance_between_potes, \
    spearman_distance_between_potes
from matplotlib import pyplot as plt

import mapof.elections.persistence.election_exports as exports
import mapof.elections.persistence.election_imports as imports
from mapof.elections.cultures import generate_ordinal_votes, \
    generate_ordinal_alliance_votes, registered_pseudo_ordinal_cultures
from mapof.elections.cultures.pseudo_cultures import (
    get_frequency_matrix_for_guardian,
    get_pairwise_matrix_for_guardian,
    update_params_ordinal,
    get_pseudo_convex,
    get_pseudo_borda_vector,
)
from mapof.elections.features.simple_ordinal import is_condorcet
from mapof.elections.objects.Election import Election
from mapof.elections.objects.Microscope import Microscope
from mapof.elections.other.glossary import PATHS
from mapof.elections.other.ordinal_rules import voting_rule



[docs] class OrdinalElection(Election): """ Ordinal Election class. """ def __init__(self, experiment_id=None, election_id=None, culture_id=None, votes=None, label=None, num_voters: int = None, num_candidates: int = None, fast_import=False, frequency_matrix=None, params=None, **kwargs): """Create an OrdinalElection instance. Parameters ---------- experiment_id : str or None Identifier of the parent experiment (used for persistence). election_id : str or None Identifier of this election instance. culture_id : str or None Identifier of the generating culture (may be a pseudo-culture). votes : list or None Raw votes as lists/tuples of candidate indices. label : str or None Human-readable label for plotting/export. num_voters : int or None Number of voters in the election. num_candidates : int or None Number of candidates in the election. fast_import : bool If True, skip expensive import steps when constructing the object. frequency_matrix : array-like or None Precomputed frequency matrix (used for pseudo cultures). params : dict or None Culture or generation parameters. **kwargs : dict Additional keyword arguments forwarded to the base class. """ super().__init__(experiment_id=experiment_id, election_id=election_id, culture_id=culture_id, votes=votes, label=label, num_voters=num_voters, num_candidates=num_candidates, fast_import=fast_import, instance_type='ordinal', params=params, **kwargs) self.frequency_matrix = [] self.bordawise_vector = [] self.potes = None self.condorcet = None self.points = {} self.alliances = {} self.quantities = None self.microscope = None if frequency_matrix is not None: self.frequency_matrix = frequency_matrix if self.is_imported and self.experiment_id is not None and not fast_import: self.import_ordinal_election() self.try_updating_params()
[docs] def import_ordinal_election(self): """ Import ordinal election. """ try: self.is_pseudo = imports.check_if_pseudo(self.experiment_id, self.election_id) if self.is_pseudo: ( self.culture_id, self.params, self.num_voters, self.num_candidates, self.frequency_matrix ) = imports.import_pseudo_ordinal_election( self.experiment_id, self.election_id ) else: ( self.votes, self.num_voters, self.num_candidates, self.params, self.culture_id, self.alliances, self.num_distinct_votes, self.quantities, self.distinct_votes ) = imports.import_ordinal_election( experiment_id=self.experiment_id, election_id=self.election_id, is_shifted=self.is_shifted) if not self.fast_import: self._votes_to_frequency_matrix() except Exception: logging.warning(f'Could not import instance {self.election_id}.')
[docs] def try_updating_params(self): """Update `self.params` based on `culture_id` when available. This uses culture-specific logic (e.g. pseudo-cultures) to ensure params are consistent with the number of candidates. """ if self.culture_id is not None: self.params = update_params_ordinal( self.params, self.culture_id, self.num_candidates )
[docs] def get_frequency_matrix(self, is_recomputed=False): """ Get frequency_matrix. """ if self.frequency_matrix is not None \ and len(self.frequency_matrix) > 0 \ and not is_recomputed: return self.frequency_matrix return self._votes_to_frequency_matrix()
[docs] def get_bordawise_vector(self, is_recomputed=False): """Return the Borda-style vector for the election, computing it if needed. Parameters ---------- is_recomputed : bool If True, force recomputation even if a cached value exists. """ if self.bordawise_vector is not None \ and len(self.bordawise_vector) > 0 \ and not is_recomputed: return self.bordawise_vector return self._votes_to_bordawise_vector()
[docs] def get_potes(self, is_recomputed=False): """ Get potes. """ if self.potes is not None \ and not is_recomputed: return self.potes return self.compute_potes()
def _votes_to_frequency_matrix(self): """ Converts votes to a frequency matrix. """ frequency_matrix = np.zeros([self.num_candidates, self.num_candidates]) if self.is_pseudo and self.frequency_matrix is not None: frequency_matrix = self.frequency_matrix if self.culture_id in PATHS: frequency_matrix = get_pseudo_convex( self.culture_id, self.num_candidates, self.params, get_frequency_matrix_for_guardian ) elif self.culture_id in registered_pseudo_ordinal_cultures: frequency_matrix = registered_pseudo_ordinal_cultures[self.culture_id]( self.num_candidates, params=self.params, ) else: for i in range(self.num_voters): pos = 0 for j in range(self.num_candidates): vote = self.votes[i][j] if vote == -1: continue frequency_matrix[vote][pos] += 1 pos += 1 for i in range(self.num_candidates): for j in range(self.num_candidates): frequency_matrix[i][j] /= float(self.num_voters) self.frequency_matrix = frequency_matrix return frequency_matrix
[docs] def votes_to_pairwise_matrix(self) -> np.ndarray: """ Convert votes to pairwise matrix. """ matrix = np.zeros([self.num_candidates, self.num_candidates]) if self.is_pseudo: if self.culture_id in { 'pseudo_identity', 'pseudo_uniformity', 'pseudo_antagonism', 'pseudo_stratification' }: matrix = get_pairwise_matrix_for_guardian(self.culture_id, self.num_candidates) elif self.culture_id in PATHS: matrix = get_pseudo_convex(self.culture_id, self.num_candidates, self.params, get_pairwise_matrix_for_guardian) else: for v in range(self.num_voters): for c1 in range(self.num_candidates): for c2 in range(c1 + 1, self.num_candidates): matrix[int(self.votes[v][c1])][ int(self.votes[v][c2])] += 1 for i in range(self.num_candidates): for j in range(i + 1, self.num_candidates): matrix[i][j] /= float(self.num_voters) matrix[j][i] = 1. - matrix[i][j] return matrix
def _votes_to_bordawise_vector(self) -> np.ndarray: """ Convert ordinal votes to Borda vector. """ borda_vector = np.zeros([self.num_candidates]) if self.is_pseudo: if self.culture_id in { 'pseudo_identity', 'pseudo_uniformity', 'pseudo_antagonism', 'pseudo_stratification' }: borda_vector = get_pseudo_borda_vector(self.culture_id, self.num_candidates, self.num_voters) elif self.culture_id in PATHS: borda_vector = get_pseudo_convex(self.culture_id, self.num_candidates, self.params, get_pseudo_borda_vector) else: c = self.num_candidates v = self.num_voters matrix = self._votes_to_frequency_matrix() borda_vector = [sum([matrix[i][j] * (c - j - 1) for j in range(c)]) * v for i in range(self.num_candidates)] borda_vector = sorted(borda_vector, reverse=True) return np.array(borda_vector)
[docs] def votes_to_voterlikeness_matrix(self, vote_distance='swap') -> np.ndarray: """ convert VOTES to voter-likeness MATRIX """ matrix = np.zeros([self.num_voters, self.num_voters]) self.compute_potes() for v1 in range(self.num_voters): for v2 in range(self.num_voters): if vote_distance == 'swap': matrix[v1][v2] = swap_distance_between_potes(self.potes[v1], self.potes[v2]) elif vote_distance == 'spearman': matrix[v1][v2] = spearman_distance_between_potes(self.potes[v1], self.potes[v2]) for i in range(self.num_voters): for j in range(i + 1, self.num_voters): matrix[j][i] = matrix[i][j] return matrix
[docs] def votes_to_agg_voterlikeness_vector(self): """ Converts ordinal votes to Borda vector. """ vector = np.zeros([self.num_voters]) for v1 in range(self.num_voters): for v2 in range(self.num_voters): swap_distance = 0 for i in range(self.num_candidates): for j in range(i + 1, self.num_candidates): if (self.potes[v1][i] > self.potes[v1][j] and self.potes[v2][i] < self.potes[v2][j]) or \ (self.potes[v1][i] < self.potes[v1][j] and self.potes[v2][i] > self.potes[v2][j]): swap_distance += 1 vector[v1] += swap_distance return vector, len(vector)
[docs] def compute_voting_rule(self, method=None, committee_size=None): """Compute winners using the given voting rule and return them. This sets `self.winners` (existing behavior) and also returns the winners for convenience so callers can do `w = obj.compute_voting_rule(...)`. """ self.winners = voting_rule(election=self, method=method, committee_size=committee_size) return self.winners
def compute_winners(self, **kwargs): # deprecated name / for backward compatibility # old callers used compute_winners(method=..., committee_size=...) # emit a deprecation warning and forward kwargs to compute_voting_rule import warnings warnings.warn( "OrdinalElection.compute_winners is deprecated, use compute_voting_rule instead", DeprecationWarning, stacklevel=2, ) return self.compute_voting_rule(**kwargs)
[docs] def prepare_instance(self, is_exported=None, is_aggregated=True): """ Prepares instance """ if 'num_alliances' in self.params: self.votes, self.alliances = generate_ordinal_alliance_votes( culture_id=self.culture_id, num_candidates=self.num_candidates, num_voters=self.num_voters, params=self.params) else: self.votes = generate_ordinal_votes(culture_id=self.culture_id, num_candidates=self.num_candidates, num_voters=self.num_voters, params=self.params) if not self.is_pseudo: c = Counter(map(tuple, self.votes)) counted_votes = [[count, list(row)] for row, count in c.items()] counted_votes = sorted(counted_votes, reverse=True) self.quantities = [a[0] for a in counted_votes] self.distinct_votes = [a[1] for a in counted_votes] self.num_distinct_votes = len(counted_votes) else: self.quantities = [self.num_voters] self.num_distinct_votes = 1 if is_exported: exports.export_election_within_experiment(self, is_aggregated=is_aggregated)
[docs] def compute_distances(self, distance_id='swap', object_type=None): """ Return: distances between votes """ if object_type is None: object_type = self.object_type # Ensure distances is always defined regardless of branches below. distances = [] self.distinct_potes = convert_votes_to_potes(self.distinct_votes) self.num_dist_votes = len(self.distinct_votes) self.num_distinct_votes = self.num_dist_votes if object_type == 'vote': distances = np.zeros([self.num_dist_votes, self.num_dist_votes]) for v1 in range(self.num_dist_votes): for v2 in range(self.num_dist_votes): if distance_id == 'swap': distances[v1][v2] = swap_distance_between_potes( self.distinct_potes[v1], self.distinct_potes[v2]) elif distance_id == 'spearman': distances[v1][v2] = spearman_distance_between_potes( self.distinct_potes[v1], self.distinct_potes[v2]) elif object_type == 'candidate': self.compute_potes() if distance_id == 'domination': distances = self.votes_to_pairwise_matrix() distances = np.abs(distances - 0.5) * self.num_voters np.fill_diagonal(distances, 0) elif distance_id == 'position': distances = np.zeros([self.num_candidates, self.num_candidates]) for c1 in range(self.num_candidates): for c2 in range(self.num_candidates): dist = 0 for pote in self.potes: dist += abs(pote[c1] - pote[c2]) distances[c1][c2] = dist else: logging.warning('incorrect object_type') distances = [] self.distances[object_type] = distances if self.is_exported: exports.export_distances(self, object_type=object_type)
[docs] def is_condorcet(self): """ Check if election witness Condorcet winner""" if self.condorcet is None: self.condorcet = is_condorcet(self)['value'] return self.condorcet
[docs] def import_ideal_points(self, name): """Import ideal points from a CSV file. The file is expected to have two columns (x, y) without a header. Parameters ---------- name : str Base name of the file to import (without path or extension). Returns ------- list of [float, float] List of ideal points as [x, y] coordinates. """ path = os.path.join(os.getcwd(), "experiments", self.experiment_id, "elections", f'{self.election_id}_{name}.csv') points = [] with open(path, 'r', newline='') as csv_file: reader = csv.DictReader(csv_file, delimiter=';') for row in reader: points.append([float(row['x']), float(row['y'])]) return points
[docs] @staticmethod def texify_label(name): """Convert a label string to a TeX-friendly format. This is used for rendering labels in plots with proper mathematical formatting. Parameters ---------- name : str The label string to convert. Returns ------- str The converted label string, suitable for use in TeX/LaTeX. """ result = name result = result.replace('phi', '$\\ \\phi$') result = result.replace('alpha', '$\\ \\alpha$') result = result.replace('omega', '$\\ \\omega$') result = result.replace('§', '\n', 1) result = result.replace('0.005', '$\\frac{1}{200}$') result = result.replace('0.025', '$\\frac{1}{40}$') result = result.replace('0.75', '$\\frac{3}{4}$') result = result.replace('0.25', '$\\frac{1}{4}$') result = result.replace('0.01', '$\\frac{1}{100}$') result = result.replace('0.05', '$\\frac{1}{20}$') result = result.replace('0.5', '$\\frac{1}{2}$') result = result.replace('0.1', '$\\frac{1}{10}$') result = result.replace('0.2', '$\\frac{1}{5}$') result = result.replace('0.4', '$\\frac{2}{5}$') result = result.replace('0.8', '$\\frac{4}{5}$') return result.replace(' ', '\n', 1)
[docs] def set_microscope( self, radius=None, alpha=0.1, s=30, object_type=None, double_gradient=False, color='blue', marker='o', title_size=20 ): """Set up a Microscope (2D scatter plot) for visualizing election data. This method configures and saves a Microscope object to `self.microscope`, which can be used for detailed inspection of the election's spatial properties. Parameters ---------- radius : float or None If given, set the x and y limits of the plot to be centered around the average coordinates with this radius. alpha : float Transparency level for the points in the scatter plot. s : int Base size for the points in the scatter plot (scaled by voter quantity). object_type : str or None Type of objects to plot ('vote' or 'candidate'). If None, inferred from `self.object_type`. double_gradient : bool If True, use a double gradient based on voter coordinates for coloring. color : str Base color for the points in the scatter plot. marker : str Marker style for the points in the scatter plot. title_size : int Font size for the title of the plot. Returns ------- Microscope The configured Microscope object. """ if object_type is None: object_type = self.object_type fig, ax = plt.subplots(figsize=(6.4, 6.4)) X = [] Y = [] for elem in self.coordinates[object_type]: X.append(elem[0]) Y.append(elem[1]) start = False if start: ax.scatter(X[0], Y[0], color='sienna', s=1000, alpha=1, marker='X') if object_type == 'vote': if double_gradient: for i in range(len(X)): x = float(self.points['voters'][i][0]) y = float(self.points['voters'][i][1]) ax.scatter(X[i], Y[i], color=[0, y, x], s=s, alpha=alpha) else: for i in range(len(X)): ax.scatter(X[i], Y[i], color=color, alpha=alpha, marker=marker, s=self.quantities[i] * s) elif object_type == 'candidate': for i in range(len(X)): ax.scatter(X[i], Y[i], color=color, alpha=alpha, marker=marker, s=s) avg_x = np.mean(X) avg_y = np.mean(Y) if radius: # Compute integer axis bounds explicitly to satisfy static analyzers. xmin = int(avg_x - radius) xmax = int(avg_x + radius) ymin = int(avg_y - radius) ymax = int(avg_y + radius) ax.set(xlim=(xmin, xmax), ylim=(ymin, ymax)) try: ax.set_title(self.texify_label(self.label), size=title_size) except Exception: pass ax.axis('off') plt.close(fig) self.microscope = Microscope(fig, ax, self.experiment_id, self.label, object_type) return self.microscope
[docs] def convert_votes_to_potes(votes) -> np.ndarray: """Convert votes to positional votes (potes). Parameters ---------- votes : sequence Iterable of vote rankings (iterables of candidate indices). Returns ------- np.ndarray Array of potes with shape (len(votes), num_candidates). """ return np.array([[list(vote).index(i) for i, _ in enumerate(vote)] for vote in votes])