Source code for mapof.elections.objects.ElectionExperiment

import ast
import csv
import logging
import os
import shutil
import time
import warnings
from abc import ABCMeta, abstractmethod

import mapof.core.persistence.experiment_exports as exports
import mapof.core.printing as pr
from mapof.core.features.register import (
    registered_experiment_features,
    features_embedding_related
)
from mapof.core.objects.Experiment import Experiment
from mapof.core.utils import get_instance_id
from tqdm import tqdm

import mapof.elections.other.approval_rules as rules
from mapof.elections.cultures.register import (
    registered_ordinal_election_cultures,
    registered_pseudo_ordinal_cultures,
    registered_approval_election_cultures
)
from mapof.elections.distances import get_distance
from mapof.elections.features.register import (
    features_with_params,
    features_rule_related
)
from mapof.elections.objects.ApprovalElection import ApprovalElection
from mapof.elections.objects.ElectionFamily import ElectionFamily
from mapof.elections.objects.OrdinalElection import OrdinalElection

try:
    from sklearn.manifold import MDS
    from sklearn.manifold import TSNE
    from sklearn.manifold import SpectralEmbedding
    from sklearn.manifold import LocallyLinearEmbedding
    from sklearn.manifold import Isomap
except ImportError as error:
    MDS = None
    TSNE = None
    SpectralEmbedding = None
    LocallyLinearEmbedding = None
    Isomap = None
    print(error)


[docs] class ElectionExperiment(Experiment): __metaclass__ = ABCMeta """Abstract set of instances.""" @abstractmethod def add_folders_to_experiment(self): pass @abstractmethod def add_feature(self, name, function): pass @abstractmethod def add_culture(self, name, function): pass def __init__(self, is_shifted=False, **kwargs): self.is_shifted = is_shifted self.default_num_candidates = 10 self.default_num_voters = 100 self.default_committee_size = 1 self.all_winning_committees = {} super().__init__(**kwargs) def __getattr__(self, attr): if attr == 'elections': return self.instances elif attr == 'num_elections': return self.num_instances else: return self.__dict__[attr] def __setattr__(self, name, value): if name == "elections": self.instances = value elif name == "num_elections": self.num_instances = value else: self.__dict__[name] = value
[docs] def add_instances_to_experiment(self) -> None: """ Add instances to the experiment. Returns: None """ instances = {} for family_id in self.families: single = self.families[family_id].single ids = [] for j in range(self.families[family_id].size): instance_id = get_instance_id(single, family_id, j) if self.instance_type == 'ordinal': instance = OrdinalElection(self.experiment_id, instance_id, is_imported=True, fast_import=self.fast_import, with_matrix=self.with_matrix, label=self.families[family_id].label) elif self.instance_type == 'approval': instance = ApprovalElection(self.experiment_id, instance_id, is_imported=True, fast_import=self.fast_import, label=self.families[family_id].label) else: instance = None instances[instance_id] = instance ids.append(str(instance_id)) self.families[family_id].election_ids = ids return instances
[docs] def set_default_num_candidates(self, num_candidates: int) -> None: """ Sets default number of candidates Parameters ---------- num_candidates : int Number of candidates. Returns ------- None """ self.default_num_candidates = num_candidates
[docs] def set_default_num_voters(self, num_voters: int) -> None: """ Sets default number of voters Parameters ---------- num_voters : int Number of voters. Returns ------- None """ self.default_num_voters = num_voters
[docs] def set_default_committee_size(self, committee_size: int) -> None: """ Sets default size of the committee Parameters ---------- committee_size : int Committee size. Returns ------- None """ self.default_committee_size = committee_size
def add_election_from_matrix(self, frequency_matrix, **kwargs): if len(frequency_matrix) != len(frequency_matrix[0]): raise ValueError("Matrix is not square") self.add_election( num_candidates=len(frequency_matrix), frequency_matrix=frequency_matrix, culture_id='frequency_matrix', **kwargs )
[docs] def add_election(self, culture_id="none", label=None, color="black", alpha: float = 1., show: bool = True, marker='x', ms=20, starting_from: int = 0, size: int = 1, num_candidates: int = None, num_voters: int = None, election_id: str = None, # deprecated instance_id: str = None, frequency_matrix=None, is_temporary: bool = False, params: dict = None, ): """ Adds election to the experiment. Parameters ---------- culture_id : str Culture id. label : str Label. color Color. alpha : float Alpha. show : bool If true the election is shown. marker : str Marker. ms : int Marker size. starting_from : int Starting index. size : int Size. num_candidates : int Number of candidates. num_voters : int Number of voters. election_id : str Election id. instance_id : str Instance id. frequency_matrix : list Frequency matrix. is_temporary : bool If true the election is temporary. params : dict Model parameters. Returns ------- list List of IDs of added instances. """ if instance_id is None: instance_id = election_id if num_candidates is None: num_candidates = self.default_num_candidates if num_voters is None: num_voters = self.default_num_voters return self.add_family(culture_id=culture_id, size=size, label=label, color=color, alpha=alpha, show=show, marker=marker, ms=ms, starting_from=starting_from, family_id=instance_id, num_candidates=num_candidates, num_voters=num_voters, frequency_matrix=frequency_matrix, is_temporary=is_temporary, single=True, params=params)
[docs] def add_family(self, culture_id: str = "none", size: int = 1, label: str = None, color: str = "black", alpha: float = 1., show: bool = True, marker: str = 'o', ms: int = 20, starting_from: int = 0, num_candidates: int = None, num_voters: int = None, frequency_matrix=None, is_temporary: bool = False, family_id: str = None, single: bool = False, path: dict = None, params: dict = None, ) -> list: """ Adds family of elections to the experiment. Parameters ---------- culture_id : str Culture id. label : str Label. color Color. alpha : float Alpha (i.e., transparency). show : bool If true the family is shown. marker : str Marker. ms : int Marker size. starting_from : int Starting index. size : int Size. num_candidates : int Number of candidates. num_voters : int Number of voters. frequency_matrix : list Frequency matrix. is_temporary : bool If true the election is temporary. family_id : str Family id. single : bool If true only one election is added. path : dict Path. params : dict Model parameters. Returns ------- list List of IDs of added instances. """ if num_candidates is None: num_candidates = self.default_num_candidates if num_voters is None: num_voters = self.default_num_voters if self.families is None: self.families = {} if params is None: params = {} if family_id is None: family_id = culture_id + '_' + str(num_candidates) + '_' + str(num_voters) if culture_id in {'urn'} and params.get('alpha') is not None: family_id += '_' + str(float(params['alpha'])) elif culture_id in {'mallows'} and params.get('phi') is not None: family_id += '_' + str(float(params['phi'])) elif culture_id in {'norm-mallows', 'norm-mallows_matrix'} \ and params.get('normphi') is not None: family_id += '_' + str(float(params['normphi'])) elif culture_id in {'euclidean'} and params.get('dim') is not None \ and params.get('space') is not None: family_id += '_' + str(int(params['dim'])) + '_' + str(params['space']) elif label is None: label = family_id self.families[family_id] = ElectionFamily(culture_id=culture_id, family_id=family_id, params=params, label=label, color=color, alpha=alpha, show=show, size=size, marker=marker, ms=ms, starting_from=starting_from, num_candidates=num_candidates, num_voters=num_voters, path=path, single=single, instance_type=self.instance_type, frequency_matrix=frequency_matrix, is_temporary=is_temporary, ) self.num_families = len(self.families) self.num_elections = sum([self.families[family_id].size for family_id in self.families]) self.main_order = [i for i in range(self.num_elections)] new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id, instance_type=self.instance_type) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] self.families[family_id].instance_ids = list(new_instances.keys()) if self.is_exported and not is_temporary: self._update_map_csv() return list(new_instances.keys())
[docs] def add_existing_family_from_dir( self, dir: str = None, culture_id: str = None, family_id: str = None, label: str = None, color: str = "black", alpha: float = 1., show: bool = True, marker: str = 'o', ms: int = 20, **kwargs ) -> list: """ Adds family of elections to the experiment from the given directory. This function copies existing election files from the specified directory and imports them into the experiment without regenerating votes. Parameters ---------- dir : str Directory containing the election files to import. culture_id : str Culture id to assign to the imported elections. family_id : str Family id. If None, defaults to culture_id. label : str Label for the family. If None, defaults to family_id. color : str Color for visualization. alpha : float Alpha (transparency) for visualization. show : bool If true the family is shown in visualizations. marker : str Marker style for visualization. ms : int Marker size for visualization. **kwargs : dict Additional parameters passed to ElectionFamily. Returns ------- list List of IDs of added instances. """ if dir is None: logging.warning('dir not specified') return [] if culture_id is None: logging.warning('culture_id not specified') return [] if family_id is None: family_id = culture_id if label is None: label = family_id # Copy instances from dir to /elections directory_in = dir directory_out = os.path.join(os.getcwd(), "experiments", self.experiment_id, 'elections') # Ensure the output directory exists os.makedirs(directory_out, exist_ok=True) # List all files in the given directory (only .soc or .app files) files = [f for f in os.listdir(directory_in) if os.path.isfile(os.path.join(directory_in, f)) and (f.endswith('.soc') or f.endswith('.app'))] # Sort the files for consistency files.sort() size = len(files) if size == 0: logging.warning(f'No election files found in {dir}') return [] # Determine the file extension based on instance type extension = '.soc' if self.instance_type == 'ordinal' else '.app' # Copy and rename each file for idx, file_name in enumerate(files): new_file_name = f"{family_id}_{idx}{extension}" old_path = os.path.join(directory_in, file_name) new_path = os.path.join(directory_out, new_file_name) # Copy the file (preserving metadata) shutil.copy2(old_path, new_path) print(f"Copying: {file_name} -> {new_file_name}") # Determine if it's a single instance single = (size == 1) # Initialize families dict if needed if self.families is None: self.families = {} # Create the family with the imported elections self.families[family_id] = ElectionFamily( culture_id=culture_id, family_id=family_id, params=kwargs.get('params', {}), label=label, color=color, alpha=alpha, show=show, size=size, marker=marker, ms=ms, starting_from=kwargs.get('starting_from', 0), num_candidates=kwargs.get('num_candidates'), num_voters=kwargs.get('num_voters'), path=kwargs.get('path'), single=single, instance_type=self.instance_type, ) self.num_families = len(self.families) self.num_elections = sum([self.families[fid].size for fid in self.families]) self.main_order = [i for i in range(self.num_elections)] # Import the elections from the copied files (instead of regenerating) new_instances = {} ids = [] for j in range(size): instance_id = get_instance_id(single, family_id, j) if self.instance_type == 'ordinal': instance = OrdinalElection( self.experiment_id, instance_id, is_imported=True, fast_import=self.fast_import, with_matrix=self.with_matrix, label=label ) elif self.instance_type == 'approval': instance = ApprovalElection( self.experiment_id, instance_id, is_imported=True, fast_import=self.fast_import, label=label ) else: instance = None new_instances[instance_id] = instance ids.append(str(instance_id)) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] self.families[family_id].instance_ids = ids if self.is_exported: self._update_map_csv() return list(new_instances.keys())
def _update_map_csv(self): families = {} path_to_file = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv') with open(path_to_file, 'w', newline='') as csv_file: writer = csv.writer(csv_file, delimiter=';') all_fields = ['size', 'num_candidates', 'num_voters', 'culture_id', 'params', 'family_id', 'label', 'color', 'alpha', 'marker', 'ms', 'path' ] writer.writerow(all_fields) for family in self.families.values(): if not family.is_temporary: all_values = [family.size, family.num_candidates, family.num_voters, family.culture_id, family.params, family.family_id, family.label, family.color, family.alpha, family.marker, family.ms, family.path] writer.writerow(all_values) return families
[docs] def add_empty_family( self, culture_id: str = "none", label: str = None, color: str = "black", alpha: float = 1., show: bool = True, marker: str = 'o', num_candidates: int = None, num_voters: int = None, family_id: str = None ): """ Adds an empty family of elections to the experiment. Parameters ---------- culture_id : str Culture id. label : str Label. color : str Color. alpha : float Alpha. show : bool If true the family is shown. marker : str Marker. num_candidates : int Number of candidates. num_voters : int Number of voters. family_id : str Family id. """ if label is None: label = family_id self.families[family_id] = ElectionFamily(culture_id=culture_id, family_id=family_id, label=label, color=color, alpha=alpha, show=show, size=0, marker=marker, num_candidates=num_candidates, num_voters=num_voters, instance_type=self.instance_type) self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id) return self.families[family_id]
[docs] def prepare_elections(self, export_points=False, is_aggregated=True) -> None: """ Prepares elections for a given experiment. Parameters ---------- export_points : bool Whether to store points in the instance. is_aggregated : bool Whether to aggregate the instances. Returns ------- None """ self.export_points = export_points self.is_aggregated = is_aggregated if self.instances is None: self.instances = {} for family_id in tqdm(self.families, desc="Preparing families"): if self.instance_type == 'ordinal' and \ self.families[family_id].culture_id not in registered_pseudo_ordinal_cultures and \ self.families[family_id].culture_id not in registered_ordinal_election_cultures: logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, ' f'since no such ORDINAL culture was found.') continue if self.instance_type == 'approval' and \ self.families[family_id].culture_id not in registered_approval_election_cultures: logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, ' f'since no such APPROVAL culture was found.') continue new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id, export_points=export_points, is_aggregated=is_aggregated, instance_type=self.instance_type, ) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id]
[docs] def compute_voting_rule(self, method=None, committee_size=1) -> None: """ Computes voting rule for all elections in the experiment. Parameters ---------- method : str Name of the boting rule to be computed. committee_size : int Size of the committee. Returns ------- None """ for election_id in self.elections: self.elections[election_id].compute_voting_rule( method=method, committee_size=committee_size)
def compute_alternative_winners(self, method=None, committee_size=None, num_parties=None): for election_id in self.elections: for party_id in range(num_parties): self.elections[election_id].compute_alternative_winners( method=method, party_id=party_id, committee_size=committee_size) def get_distance( self, election_1, election_2, distance_id: str = None, **kwargs ) -> float or (float, list): return get_distance(election_1, election_2, distance_id) def print_matrix(self, **kwargs): pr.print_matrix(experiment=self, **kwargs)
[docs] def import_controllers(self) -> list: """ Import controllers from a file Returns ------- list List of families. """ families = {} path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv') with open(path, 'r') as file_: header = [h.strip() for h in file_.readline().split(';')] reader = csv.DictReader(file_, fieldnames=header, delimiter=';') all_num_candidates = [] all_num_voters = [] starting_from = 0 for row in reader: culture_id = None params = None size = None num_candidates = None num_voters = None family_id = None print_params = {} if 'culture_id' in row.keys(): culture_id = str(row['culture_id']).strip() if 'family_id' in row.keys(): family_id = str(row['family_id']) if 'params' in row.keys(): params = ast.literal_eval(str(row['params'])) if 'size' in row.keys(): size = int(row['size']) if 'num_candidates' in row.keys(): num_candidates = int(row['num_candidates']) if 'num_voters' in row.keys(): num_voters = int(row['num_voters']) if 'path' in row.keys(): path = ast.literal_eval(str(row['path'])) if 'label' in row.keys(): print_params['label'] = str(row['label']) if 'alpha' in row.keys(): print_params['alpha'] = float(row['alpha']) if 'marker' in row.keys(): print_params['marker'] = str(row['marker']).strip() if 'ms' in row.keys(): print_params['ms'] = int(row['ms']) if 'color' in row.keys(): print_params['color'] = str(row['color']).strip() single = size == 1 families[family_id] = ElectionFamily(culture_id=culture_id, family_id=family_id, params=params, size=size, starting_from=starting_from, num_candidates=num_candidates, num_voters=num_voters, path=path, single=single, **print_params ) starting_from += size all_num_candidates.append(num_candidates) all_num_voters.append(num_voters) _check_if_all_equal(all_num_candidates, 'num_candidates') _check_if_all_equal(all_num_voters, 'num_voters') self.num_families = len(families) self.num_elections = sum([families[family_id].size for family_id in families]) self.main_order = [i for i in range(self.num_elections)] return families
[docs] def compute_feature( self, feature_id: str = None, feature_params: dict = None, overwrite: bool = False, saveas: str = None, **kwargs ) -> dict: """ Computes a feature for each in instances in the experiment. Parameters ---------- feature_id : str The id of the feature to compute. feature_params : dict The parameters of the feature to compute. overwrite : bool Whether to overwrite the feature if it already exists. saveas : str Returns ------- dict A dictionary with the computed feature for each instance. """ if feature_params is None: feature_params = {} if feature_id in features_rule_related: feature_long_id = f'{feature_id}_{feature_params["rule"]}' elif feature_id in features_embedding_related: feature_long_id = f'{feature_id}_{self.embedding_id}' else: feature_long_id = feature_id num_iterations = feature_params.get('num_iterations', 1) feature_dict = {'value': {}, 'time': {}} if feature_id in registered_experiment_features: feature = registered_experiment_features[feature_id] solution_dict = feature(self, election_ids=list(self.instances), **kwargs) for instance_id in tqdm(self.instances, desc='Computing experiment feature'): for key in solution_dict[instance_id]: if key not in feature_dict: feature_dict[key] = {} feature_dict['value'][instance_id] = solution_dict[instance_id] if solution_dict[instance_id] is None: feature_dict['time'][instance_id] = None else: feature_dict['time'][instance_id] = 0 else: for instance_id in tqdm(self.instances, desc=f"{feature_long_id}"): instance = self.elections[instance_id] start = time.time() solution = None for _ in range(num_iterations): if feature_id in features_with_params: solution = instance.get_feature(feature_id, feature_long_id, feature_params=feature_params) else: solution = instance.get_feature(feature_id, feature_long_id, overwrite=overwrite, **kwargs) value = None total_time = time.time() - start total_time /= num_iterations if solution is not None: if type(solution) is dict: if 'value' not in solution: solution['value'] = None for key in solution: if key not in feature_dict: feature_dict[key] = {} feature_dict[key][instance_id] = solution[key] else: feature_dict['value'][instance_id] = solution feature_dict['time'][instance_id] = total_time else: feature_dict['value'][instance_id] = value feature_dict['time'][instance_id] = total_time if saveas is None: saveas = feature_long_id if self.is_exported: exports.export_feature_to_file(self, feature_id=feature_id, feature_dict=feature_dict, saveas=saveas) for election_id in self.elections: self.instances[election_id].features[saveas] = feature_dict['value'][election_id] self.features[saveas] = feature_dict return feature_dict
[docs] def compute_rules( self, list_of_rules: list, committee_size: int = 10, resolute: bool = False ) -> None: """ Computes the winning committees for a list of rules. Parameters ---------- list_of_rules : list A list of rules to compute the winning committees for. committee_size : int The size of the winning committees. resolute : bool Whether to compute the resolute committees. Returns ------- None """ for rule_name in list_of_rules: print('Computing', rule_name) rules.compute_abcvoting_rule( experiment=self, rule_name=rule_name, committee_size=committee_size, resolute=resolute)
[docs] def import_committees(self, list_of_rules) -> None: """ Imports the winning committees for a list of rules. Parameters ---------- list_of_rules : list A list of rules to import the winning committees for. Returns ------- None """ for rule_name in list_of_rules: self.all_winning_committees[rule_name] = rules.import_committees_from_file( experiment_id=self.experiment_id, rule_name=rule_name)
[docs] def add_election_to_family(self, election=None, family_id=None) -> None: """ Adds an election to a family. Parameters ---------- election : Election The election to add to the family. family_id : str The id of the family to add the election to. Returns ------- None """ election.instance_id = f'{self.families[family_id]}_{self.families[family_id].size}' self.instances[election.instance_id] = election self.families[family_id].add_election(election)
def prepare_election_features(self): for election in self.instances.items(): election[1].election_features.votes = election[1].votes election[1].election_features.num_candidates = election[1].num_candidates election[1].election_features.num_voters = election[1].num_voters election[1].election_features.calculate_all() # def prepare_compass_dictionary(self): # for election in self.instances.items(): # election[1].election_features.votes = election[1].votes # election[1].election_features.num_candidates = election[1].num_candidates # election[1].election_features.num_voters = election[1].num_voters # election[1].election_features.compass_points['ST'] = self.instances[ # ST_KEY + str(election[1].num_candidates)] # election[1].election_features.compass_points['AN'] = self.instances[ # AN_KEY + str(election[1].num_candidates)] # election[1].election_features.compass_points['ID'] = self.instances[ # ID_KEY + str(election[1].num_candidates)] # election[1].election_features.compass_points['UN'] = self.instances[ # UN_KEY + str(election[1].num_candidates)] def calculate_dap(self, id): dap = list() dap.append(self.features['Diversity'][id]) dap.append(self.features['Agreement'][id]) dap.append(self.features['Polarization'][id]) return dap def calculate_features_vector(self, id, features_list: list): vector = list() if 'd' in features_list: vector.append(self.features['Diversity'][id]) if 'a' in features_list: vector.append(self.features['Agreement'][id]) if 'p' in features_list: vector.append(self.features['Polarization'][id]) if 'e' in features_list: vector.append(self.features['Entropy'][id]) if 'e2' in features_list: vector.append(self.features['Entropy'][id] * self.features['Entropy'][id]) if 'cds' in features_list: vector.append(self.features['CandidateDistanceStd'][id]) return vector def prepare_election_sizes(self): for election in self.instances.items(): self.election_sizes.add(election[1].num_candidates) def prepare_feature_vectors(self, features: list): for election in self.instances.items(): election[1].election_features.votes = election[1].votes election[1].election_features.num_candidates = election[1].num_candidates election[1].election_features.num_voters = election[1].num_voters election[1].election_features.features_vector = self.calculate_features_vector( election[1].election_id, features) def prepare_instances(self): return self.prepare_elections() def add_instance(self): return self.add_election() def __getstate__(self): return self.__dict__ def __setstate__(self, state): self.__dict__.update(state)
def _check_if_all_equal(values, subject): if any(x != values[0] for x in values): text = f'Not all {subject} values are equal!' warnings.warn(text)