Source code for mapof.elections.objects.ApprovalElection

import logging
from abc import ABC
from collections import Counter
from typing import List, Set, Optional

import numpy as np
from mapof.core.distances import hamming
from matplotlib import pyplot as plt

import mapof.elections.persistence.election_exports as exports
import mapof.elections.persistence.election_imports as imports
from mapof.elections.cultures import generate_approval_votes
from mapof.elections.cultures.params import update_params_approval
from mapof.elections.objects.Election import Election
from mapof.elections.objects.Microscope import Microscope


[docs] class ApprovalElection(Election, ABC): """ Approval Election class. """ def __init__(self, experiment_id=None, election_id=None, culture_id=None, num_candidates=None, fast_import=False, params=None, **kwargs): super().__init__(experiment_id=experiment_id, election_id=election_id, culture_id=culture_id, num_candidates=num_candidates, instance_type='approval', fast_import=fast_import, params=params, **kwargs) # cached / derived values (None until computed) self.approvalwise_vector: Optional[np.ndarray] = None self.reverse_approvals: Optional[List[Set[int]]] = None self.candidatelikeness_original_vectors: Optional[np.ndarray] = None if self.is_imported and self.experiment_id is not None and not fast_import: self.import_approval_election() self.try_updating_params()
[docs] def import_approval_election(self) -> None: """ Imports approval elections from a file. """ try: ( self.votes, self.num_voters, self.num_candidates, self.params, self.culture_id, self.num_options, self.quantities, self.distinct_votes, ) = imports.import_approval_election( experiment_id=self.experiment_id, election_id=self.election_id, is_shifted=self.is_shifted ) except Exception: logging.warning(f'Could not import instance {self.election_id}.')
def try_updating_params(self) -> None: if self.culture_id is not None: self.params = update_params_approval( self.params, self.culture_id, )
[docs] def votes_to_approvalwise_vector(self) -> None: """ Converts votes to an approval-wise frequency vector (fraction of voters approving each candidate). The resulting vector is sorted (ascending) to match prior behavior. """ approvalwise_vector = np.zeros(self.num_candidates, dtype=float) for vote in self.votes: for c in vote: approvalwise_vector[int(c)] += 1 approvalwise_vector = approvalwise_vector / float(self.num_voters) self.approvalwise_vector = np.sort(approvalwise_vector)
[docs] def compute_reverse_approvals(self) -> None: """ Computes reverse approvals: for each candidate, the set of voters who approve them. """ self.reverse_approvals = [ {i for i, vote in enumerate(self.votes) if c in vote} for c in range(self.num_candidates) ]
[docs] def get_reverse_approvals(self) -> List[Set[int]]: """ Returns the reverse approvals, computing them if necessary. """ if not self.reverse_approvals: self.compute_reverse_approvals() return self.reverse_approvals
[docs] def prepare_instance(self, is_exported: bool = False, is_aggregated: bool = True) -> None: """ Prepares all the instances within the experiment. """ self.votes = generate_approval_votes(culture_id=self.culture_id, num_candidates=self.num_candidates, num_voters=self.num_voters, params=self.params) if not self.is_pseudo: c = Counter(map(tuple, self.votes)) counted_votes = [[count, list(row)] for row, count in c.items()] counted_votes = sorted(counted_votes, reverse=True) self.quantities = [a[0] for a in counted_votes] self.distinct_votes = [a[1] for a in counted_votes] self.num_options = len(counted_votes) else: self.quantities = [self.num_voters] self.num_options = 1 if is_exported: exports.export_election_within_experiment(self, is_aggregated=is_aggregated)
def _compute_distances_between_votes(self, distance_id: str = 'hamming') -> np.ndarray: """ Computes distances between votes. Uses symmetry to compute only upper triangle and mirrors it. """ n = self.num_voters distances = np.zeros((n, n), dtype=float) if distance_id == 'hamming': for v1 in range(n): a = self.votes[v1] for v2 in range(v1, n): b = self.votes[v2] d = hamming(a, b) distances[v1, v2] = d distances[v2, v1] = d self.distances['vote'] = distances if self.is_exported: exports.export_distances(self, object_type='vote') return distances elif distance_id == 'jaccard': for v1 in range(n): a = self.votes[v1] for v2 in range(v1, n): b = self.votes[v2] union_len = len(a.union(b)) if union_len == 0: d = 1.0 else: d = 1.0 - len(a.intersection(b)) / union_len distances[v1, v2] = d distances[v2, v1] = d self.distances['vote'] = distances if self.is_exported: exports.export_distances(self, object_type='vote') return distances else: raise ValueError(f'Unknown distance_id: {distance_id}') def _compute_distances_between_candidates(self, distance_id: str = 'hamming') -> np.ndarray: """ Computes distances between the candidates (based on reverse approvals). Uses symmetry to halve computations. """ self.compute_reverse_approvals() m = self.num_candidates distances = np.zeros((m, m), dtype=float) if distance_id == 'hamming': for c1 in range(m): a = self.reverse_approvals[c1] for c2 in range(c1, m): b = self.reverse_approvals[c2] d = hamming(a, b) distances[c1, c2] = d distances[c2, c1] = d self.distances['candidate'] = distances if self.is_exported: exports.export_distances(self, object_type='candidate') return distances elif distance_id == 'jaccard': for c1 in range(m): a = self.reverse_approvals[c1] for c2 in range(c1, m): b = self.reverse_approvals[c2] union_len = len(a.union(b)) if union_len == 0: d = 1.0 else: d = 1.0 - len(a.intersection(b)) / union_len distances[c1, c2] = d distances[c2, c1] = d self.distances['candidate'] = distances if self.is_exported: exports.export_distances(self, object_type='candidate') return distances else: raise ValueError(f'Unknown distance_id: {distance_id}') def get_candidatelikeness_original_vectors(self, is_recomputed: bool = False) -> np.ndarray: if self.candidatelikeness_original_vectors is not None and not is_recomputed: return self.candidatelikeness_original_vectors return self._voted_to_candidatelikeness_original_vectors() def _voted_to_candidatelikeness_original_vectors(self) -> np.ndarray: """ Converts votes to candidate-likeness vectors: for each ordered pair (i,j) counts fraction of voters who approve exactly one of the two candidates (i xor j). """ matrix = np.zeros((self.num_candidates, self.num_candidates), dtype=float) for c_1 in range(self.num_candidates): for c_2 in range(self.num_candidates): count = 0 for vote in self.votes: if (c_1 in vote) != (c_2 in vote): count += 1 matrix[c_1, c_2] = count candidatelikeness_original_vectors = matrix / float(self.num_voters) self.candidatelikeness_original_vectors = candidatelikeness_original_vectors return candidatelikeness_original_vectors def _voted_to_candidatelikeness_original_vectors_vectorized(self) -> np.ndarray: """ Vectorized implementation of candidate-likeness computation. Builds a boolean matrix A of shape (num_voters, num_candidates) where A[v, c] == 1 if voter v approves c. Then for candidates i,j: count_xor(i,j) = s[i] + s[j] - 2 * intersection[i,j] where s is column sums and intersection = A.T @ A. Returns the same normalized matrix (divided by num_voters). """ n = int(self.num_voters) m = int(self.num_candidates) if n == 0 or m == 0: res = np.zeros((m, m), dtype=float) self.candidatelikeness_original_vectors = res return res # Build boolean matrix A (n x m) A = np.zeros((n, m), dtype=np.uint8) for v_idx, vote in enumerate(self.votes): for c in vote: A[v_idx, int(c)] = 1 # column sums (number of voters approving each candidate) s = A.sum(axis=0).astype(np.int64) # intersection counts: number of voters approving both candidates i and j inter = A.T @ A # shape (m, m), dtype=int # XOR counts: s_i + s_j - 2*inter_ij # use broadcasting matrix = s.reshape((m, 1)) + s.reshape((1, m)) - 2 * inter res = matrix.astype(float) / float(n) self.candidatelikeness_original_vectors = res return res
[docs] def compute_distances(self, object_type: Optional[str] = None, distance_id: str = 'hamming') -> np.ndarray: """ Computes distances between the votes or candidates. """ if object_type is None: object_type = self.object_type if object_type == 'vote': return self._compute_distances_between_votes(distance_id=distance_id) elif object_type == 'candidate': return self._compute_distances_between_candidates(distance_id=distance_id) else: raise ValueError(f'Unknown object_type: {object_type}')
[docs] def set_microscope( self, radius: Optional[float] = None, alpha: float = 0.1, s=30, object_type: Optional[str] = None, double_gradient: bool = False, color: str = 'blue', marker: str = 'o', annotate: bool = False ) -> Microscope: """Print a map of the election (i.e., microscope) using matplotlib's OO API.""" if object_type is None: object_type = self.object_type fig, ax = plt.subplots(figsize=(6.4, 6.4)) X = [elem[0] for elem in self.coordinates[object_type]] Y = [elem[1] for elem in self.coordinates[object_type]] if double_gradient: for i in range(len(X)): x = float(self.points['voters'][i][0]) y = float(self.points['voters'][i][1]) ax.scatter(X[i], Y[i], color=(0.0, y, x), s=s, alpha=alpha) else: ax.scatter(X, Y, color=color, s=s, alpha=alpha, marker=marker) if annotate: for i in range(len(X)): ax.annotate(str(i), (X[i], Y[i]), color='black') avg_x = float(np.mean(X)) avg_y = float(np.mean(Y)) if radius is not None: ax.set_xlim((avg_x - radius, avg_x + radius)) ax.set_ylim((avg_y - radius, avg_y + radius)) ax.axis('off') plt.close(fig) self.microscope = Microscope(fig, ax, self.experiment_id, self.label, object_type) return self.microscope