import ast
import csv
import logging
import os
import shutil
import time
import warnings
from abc import ABCMeta, abstractmethod
import mapof.core.persistence.experiment_exports as exports
import mapof.core.printing as pr
from mapof.core.features.register import (
registered_experiment_features,
features_embedding_related
)
from mapof.core.objects.Experiment import Experiment
from mapof.core.utils import get_instance_id
from tqdm import tqdm
import mapof.elections.other.approval_rules as rules
from mapof.elections.cultures.register import (
registered_ordinal_election_cultures,
registered_pseudo_ordinal_cultures,
registered_approval_election_cultures
)
from mapof.elections.distances import get_distance
from mapof.elections.features.register import (
features_with_params,
features_rule_related
)
from mapof.elections.objects.ApprovalElection import ApprovalElection
from mapof.elections.objects.ElectionFamily import ElectionFamily
from mapof.elections.objects.OrdinalElection import OrdinalElection
try:
from sklearn.manifold import MDS
from sklearn.manifold import TSNE
from sklearn.manifold import SpectralEmbedding
from sklearn.manifold import LocallyLinearEmbedding
from sklearn.manifold import Isomap
except ImportError as error:
MDS = None
TSNE = None
SpectralEmbedding = None
LocallyLinearEmbedding = None
Isomap = None
print(error)
[docs]
class ElectionExperiment(Experiment):
__metaclass__ = ABCMeta
"""Abstract set of instances."""
@abstractmethod
def add_folders_to_experiment(self):
pass
@abstractmethod
def add_feature(self, name, function):
pass
@abstractmethod
def add_culture(self, name, function):
pass
def __init__(self, is_shifted=False, **kwargs):
self.is_shifted = is_shifted
self.default_num_candidates = 10
self.default_num_voters = 100
self.default_committee_size = 1
self.all_winning_committees = {}
super().__init__(**kwargs)
def __getattr__(self, attr):
if attr == 'elections':
return self.instances
elif attr == 'num_elections':
return self.num_instances
else:
return self.__dict__[attr]
def __setattr__(self, name, value):
if name == "elections":
self.instances = value
elif name == "num_elections":
self.num_instances = value
else:
self.__dict__[name] = value
[docs]
def add_instances_to_experiment(self) -> None:
"""
Add instances to the experiment.
Returns:
None
"""
instances = {}
for family_id in self.families:
single = self.families[family_id].single
ids = []
for j in range(self.families[family_id].size):
instance_id = get_instance_id(single, family_id, j)
if self.instance_type == 'ordinal':
instance = OrdinalElection(self.experiment_id, instance_id,
is_imported=True,
fast_import=self.fast_import,
with_matrix=self.with_matrix,
label=self.families[family_id].label)
elif self.instance_type == 'approval':
instance = ApprovalElection(self.experiment_id, instance_id,
is_imported=True,
fast_import=self.fast_import,
label=self.families[family_id].label)
else:
instance = None
instances[instance_id] = instance
ids.append(str(instance_id))
self.families[family_id].election_ids = ids
return instances
[docs]
def set_default_num_candidates(self, num_candidates: int) -> None:
"""
Sets default number of candidates
Parameters
----------
num_candidates : int
Number of candidates.
Returns
-------
None
"""
self.default_num_candidates = num_candidates
[docs]
def set_default_num_voters(self, num_voters: int) -> None:
"""
Sets default number of voters
Parameters
----------
num_voters : int
Number of voters.
Returns
-------
None
"""
self.default_num_voters = num_voters
[docs]
def set_default_committee_size(self, committee_size: int) -> None:
"""
Sets default size of the committee
Parameters
----------
committee_size : int
Committee size.
Returns
-------
None
"""
self.default_committee_size = committee_size
def add_election_from_matrix(self,
frequency_matrix,
**kwargs):
if len(frequency_matrix) != len(frequency_matrix[0]):
raise ValueError("Matrix is not square")
self.add_election(
num_candidates=len(frequency_matrix),
frequency_matrix=frequency_matrix,
culture_id='frequency_matrix',
**kwargs
)
[docs]
def add_election(self,
culture_id="none",
label=None,
color="black",
alpha: float = 1.,
show: bool = True,
marker='x',
ms=20,
starting_from: int = 0,
size: int = 1,
num_candidates: int = None,
num_voters: int = None,
election_id: str = None, # deprecated
instance_id: str = None,
frequency_matrix=None,
is_temporary: bool = False,
params: dict = None,
):
"""
Adds election to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color
Color.
alpha : float
Alpha.
show : bool
If true the election is shown.
marker : str
Marker.
ms : int
Marker size.
starting_from : int
Starting index.
size : int
Size.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
election_id : str
Election id.
instance_id : str
Instance id.
frequency_matrix : list
Frequency matrix.
is_temporary : bool
If true the election is temporary.
params : dict
Model parameters.
Returns
-------
list
List of IDs of added instances.
"""
if instance_id is None:
instance_id = election_id
if num_candidates is None:
num_candidates = self.default_num_candidates
if num_voters is None:
num_voters = self.default_num_voters
return self.add_family(culture_id=culture_id,
size=size,
label=label,
color=color,
alpha=alpha,
show=show,
marker=marker,
ms=ms,
starting_from=starting_from,
family_id=instance_id,
num_candidates=num_candidates,
num_voters=num_voters,
frequency_matrix=frequency_matrix,
is_temporary=is_temporary,
single=True,
params=params)
[docs]
def add_family(self,
culture_id: str = "none",
size: int = 1,
label: str = None,
color: str = "black",
alpha: float = 1.,
show: bool = True,
marker: str = 'o',
ms: int = 20,
starting_from: int = 0,
num_candidates: int = None,
num_voters: int = None,
frequency_matrix=None,
is_temporary: bool = False,
family_id: str = None,
single: bool = False,
path: dict = None,
params: dict = None,
) -> list:
"""
Adds family of elections to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color
Color.
alpha : float
Alpha (i.e., transparency).
show : bool
If true the family is shown.
marker : str
Marker.
ms : int
Marker size.
starting_from : int
Starting index.
size : int
Size.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
frequency_matrix : list
Frequency matrix.
is_temporary : bool
If true the election is temporary.
family_id : str
Family id.
single : bool
If true only one election is added.
path : dict
Path.
params : dict
Model parameters.
Returns
-------
list
List of IDs of added instances.
"""
if num_candidates is None:
num_candidates = self.default_num_candidates
if num_voters is None:
num_voters = self.default_num_voters
if self.families is None:
self.families = {}
if params is None:
params = {}
if family_id is None:
family_id = culture_id + '_' + str(num_candidates) + '_' + str(num_voters)
if culture_id in {'urn'} and params.get('alpha') is not None:
family_id += '_' + str(float(params['alpha']))
elif culture_id in {'mallows'} and params.get('phi') is not None:
family_id += '_' + str(float(params['phi']))
elif culture_id in {'norm-mallows', 'norm-mallows_matrix'} \
and params.get('normphi') is not None:
family_id += '_' + str(float(params['normphi']))
elif culture_id in {'euclidean'} and params.get('dim') is not None \
and params.get('space') is not None:
family_id += '_' + str(int(params['dim'])) + '_' + str(params['space'])
elif label is None:
label = family_id
self.families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
params=params,
label=label,
color=color,
alpha=alpha,
show=show,
size=size,
marker=marker,
ms=ms,
starting_from=starting_from,
num_candidates=num_candidates,
num_voters=num_voters,
path=path,
single=single,
instance_type=self.instance_type,
frequency_matrix=frequency_matrix,
is_temporary=is_temporary,
)
self.num_families = len(self.families)
self.num_elections = sum([self.families[family_id].size for family_id in self.families])
self.main_order = [i for i in range(self.num_elections)]
new_instances = self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id,
instance_type=self.instance_type)
for instance_id in new_instances:
self.instances[instance_id] = new_instances[instance_id]
self.families[family_id].instance_ids = list(new_instances.keys())
if self.is_exported and not is_temporary:
self._update_map_csv()
return list(new_instances.keys())
[docs]
def add_existing_family_from_dir(
self,
dir: str = None,
culture_id: str = None,
family_id: str = None,
label: str = None,
color: str = "black",
alpha: float = 1.,
show: bool = True,
marker: str = 'o',
ms: int = 20,
**kwargs
) -> list:
"""
Adds family of elections to the experiment from the given directory.
This function copies existing election files from the specified directory
and imports them into the experiment without regenerating votes.
Parameters
----------
dir : str
Directory containing the election files to import.
culture_id : str
Culture id to assign to the imported elections.
family_id : str
Family id. If None, defaults to culture_id.
label : str
Label for the family. If None, defaults to family_id.
color : str
Color for visualization.
alpha : float
Alpha (transparency) for visualization.
show : bool
If true the family is shown in visualizations.
marker : str
Marker style for visualization.
ms : int
Marker size for visualization.
**kwargs : dict
Additional parameters passed to ElectionFamily.
Returns
-------
list
List of IDs of added instances.
"""
if dir is None:
logging.warning('dir not specified')
return []
if culture_id is None:
logging.warning('culture_id not specified')
return []
if family_id is None:
family_id = culture_id
if label is None:
label = family_id
# Copy instances from dir to /elections
directory_in = dir
directory_out = os.path.join(os.getcwd(),
"experiments",
self.experiment_id,
'elections')
# Ensure the output directory exists
os.makedirs(directory_out, exist_ok=True)
# List all files in the given directory (only .soc or .app files)
files = [f for f in os.listdir(directory_in) if
os.path.isfile(os.path.join(directory_in, f)) and
(f.endswith('.soc') or f.endswith('.app'))]
# Sort the files for consistency
files.sort()
size = len(files)
if size == 0:
logging.warning(f'No election files found in {dir}')
return []
# Determine the file extension based on instance type
extension = '.soc' if self.instance_type == 'ordinal' else '.app'
# Copy and rename each file
for idx, file_name in enumerate(files):
new_file_name = f"{family_id}_{idx}{extension}"
old_path = os.path.join(directory_in, file_name)
new_path = os.path.join(directory_out, new_file_name)
# Copy the file (preserving metadata)
shutil.copy2(old_path, new_path)
print(f"Copying: {file_name} -> {new_file_name}")
# Determine if it's a single instance
single = (size == 1)
# Initialize families dict if needed
if self.families is None:
self.families = {}
# Create the family with the imported elections
self.families[family_id] = ElectionFamily(
culture_id=culture_id,
family_id=family_id,
params=kwargs.get('params', {}),
label=label,
color=color,
alpha=alpha,
show=show,
size=size,
marker=marker,
ms=ms,
starting_from=kwargs.get('starting_from', 0),
num_candidates=kwargs.get('num_candidates'),
num_voters=kwargs.get('num_voters'),
path=kwargs.get('path'),
single=single,
instance_type=self.instance_type,
)
self.num_families = len(self.families)
self.num_elections = sum([self.families[fid].size for fid in self.families])
self.main_order = [i for i in range(self.num_elections)]
# Import the elections from the copied files (instead of regenerating)
new_instances = {}
ids = []
for j in range(size):
instance_id = get_instance_id(single, family_id, j)
if self.instance_type == 'ordinal':
instance = OrdinalElection(
self.experiment_id,
instance_id,
is_imported=True,
fast_import=self.fast_import,
with_matrix=self.with_matrix,
label=label
)
elif self.instance_type == 'approval':
instance = ApprovalElection(
self.experiment_id,
instance_id,
is_imported=True,
fast_import=self.fast_import,
label=label
)
else:
instance = None
new_instances[instance_id] = instance
ids.append(str(instance_id))
for instance_id in new_instances:
self.instances[instance_id] = new_instances[instance_id]
self.families[family_id].instance_ids = ids
if self.is_exported:
self._update_map_csv()
return list(new_instances.keys())
def _update_map_csv(self):
families = {}
path_to_file = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv')
with open(path_to_file, 'w', newline='') as csv_file:
writer = csv.writer(csv_file, delimiter=';')
all_fields = ['size',
'num_candidates',
'num_voters',
'culture_id',
'params',
'family_id',
'label',
'color',
'alpha',
'marker',
'ms',
'path'
]
writer.writerow(all_fields)
for family in self.families.values():
if not family.is_temporary:
all_values = [family.size,
family.num_candidates,
family.num_voters,
family.culture_id,
family.params,
family.family_id,
family.label,
family.color,
family.alpha,
family.marker,
family.ms,
family.path]
writer.writerow(all_values)
return families
[docs]
def add_empty_family(
self,
culture_id: str = "none",
label: str = None,
color: str = "black",
alpha: float = 1.,
show: bool = True,
marker: str = 'o',
num_candidates: int = None,
num_voters: int = None,
family_id: str = None
):
"""
Adds an empty family of elections to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color : str
Color.
alpha : float
Alpha.
show : bool
If true the family is shown.
marker : str
Marker.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
family_id : str
Family id.
"""
if label is None:
label = family_id
self.families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
label=label,
color=color,
alpha=alpha,
show=show,
size=0,
marker=marker,
num_candidates=num_candidates,
num_voters=num_voters,
instance_type=self.instance_type)
self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id)
return self.families[family_id]
[docs]
def prepare_elections(self, export_points=False, is_aggregated=True) -> None:
"""
Prepares elections for a given experiment.
Parameters
----------
export_points : bool
Whether to store points in the instance.
is_aggregated : bool
Whether to aggregate the instances.
Returns
-------
None
"""
self.export_points = export_points
self.is_aggregated = is_aggregated
if self.instances is None:
self.instances = {}
for family_id in tqdm(self.families, desc="Preparing families"):
if self.instance_type == 'ordinal' and \
self.families[family_id].culture_id not in registered_pseudo_ordinal_cultures and \
self.families[family_id].culture_id not in registered_ordinal_election_cultures:
logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, '
f'since no such ORDINAL culture was found.')
continue
if self.instance_type == 'approval' and \
self.families[family_id].culture_id not in registered_approval_election_cultures:
logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, '
f'since no such APPROVAL culture was found.')
continue
new_instances = self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id,
export_points=export_points,
is_aggregated=is_aggregated,
instance_type=self.instance_type,
)
for instance_id in new_instances:
self.instances[instance_id] = new_instances[instance_id]
[docs]
def compute_voting_rule(self, method=None, committee_size=1) -> None:
"""
Computes voting rule for all elections in the experiment.
Parameters
----------
method : str
Name of the boting rule to be computed.
committee_size : int
Size of the committee.
Returns
-------
None
"""
for election_id in self.elections:
self.elections[election_id].compute_voting_rule(
method=method, committee_size=committee_size)
def compute_alternative_winners(self, method=None, committee_size=None, num_parties=None):
for election_id in self.elections:
for party_id in range(num_parties):
self.elections[election_id].compute_alternative_winners(
method=method, party_id=party_id, committee_size=committee_size)
def get_distance(
self,
election_1,
election_2,
distance_id: str = None,
**kwargs
) -> float or (float, list):
return get_distance(election_1, election_2, distance_id)
def print_matrix(self, **kwargs):
pr.print_matrix(experiment=self, **kwargs)
[docs]
def import_controllers(self) -> list:
"""
Import controllers from a file
Returns
-------
list
List of families.
"""
families = {}
path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv')
with open(path, 'r') as file_:
header = [h.strip() for h in file_.readline().split(';')]
reader = csv.DictReader(file_, fieldnames=header, delimiter=';')
all_num_candidates = []
all_num_voters = []
starting_from = 0
for row in reader:
culture_id = None
params = None
size = None
num_candidates = None
num_voters = None
family_id = None
print_params = {}
if 'culture_id' in row.keys():
culture_id = str(row['culture_id']).strip()
if 'family_id' in row.keys():
family_id = str(row['family_id'])
if 'params' in row.keys():
params = ast.literal_eval(str(row['params']))
if 'size' in row.keys():
size = int(row['size'])
if 'num_candidates' in row.keys():
num_candidates = int(row['num_candidates'])
if 'num_voters' in row.keys():
num_voters = int(row['num_voters'])
if 'path' in row.keys():
path = ast.literal_eval(str(row['path']))
if 'label' in row.keys():
print_params['label'] = str(row['label'])
if 'alpha' in row.keys():
print_params['alpha'] = float(row['alpha'])
if 'marker' in row.keys():
print_params['marker'] = str(row['marker']).strip()
if 'ms' in row.keys():
print_params['ms'] = int(row['ms'])
if 'color' in row.keys():
print_params['color'] = str(row['color']).strip()
single = size == 1
families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
params=params,
size=size,
starting_from=starting_from,
num_candidates=num_candidates,
num_voters=num_voters,
path=path,
single=single,
**print_params
)
starting_from += size
all_num_candidates.append(num_candidates)
all_num_voters.append(num_voters)
_check_if_all_equal(all_num_candidates, 'num_candidates')
_check_if_all_equal(all_num_voters, 'num_voters')
self.num_families = len(families)
self.num_elections = sum([families[family_id].size for family_id in families])
self.main_order = [i for i in range(self.num_elections)]
return families
[docs]
def compute_feature(
self,
feature_id: str = None,
feature_params: dict = None,
overwrite: bool = False,
saveas: str = None,
**kwargs
) -> dict:
"""
Computes a feature for each in instances in the experiment.
Parameters
----------
feature_id : str
The id of the feature to compute.
feature_params : dict
The parameters of the feature to compute.
overwrite : bool
Whether to overwrite the feature if it already exists.
saveas : str
Returns
-------
dict
A dictionary with the computed feature for each instance.
"""
if feature_params is None:
feature_params = {}
if feature_id in features_rule_related:
feature_long_id = f'{feature_id}_{feature_params["rule"]}'
elif feature_id in features_embedding_related:
feature_long_id = f'{feature_id}_{self.embedding_id}'
else:
feature_long_id = feature_id
num_iterations = feature_params.get('num_iterations', 1)
feature_dict = {'value': {}, 'time': {}}
if feature_id in registered_experiment_features:
feature = registered_experiment_features[feature_id]
solution_dict = feature(self, election_ids=list(self.instances), **kwargs)
for instance_id in tqdm(self.instances, desc='Computing experiment feature'):
for key in solution_dict[instance_id]:
if key not in feature_dict:
feature_dict[key] = {}
feature_dict['value'][instance_id] = solution_dict[instance_id]
if solution_dict[instance_id] is None:
feature_dict['time'][instance_id] = None
else:
feature_dict['time'][instance_id] = 0
else:
for instance_id in tqdm(self.instances, desc=f"{feature_long_id}"):
instance = self.elections[instance_id]
start = time.time()
solution = None
for _ in range(num_iterations):
if feature_id in features_with_params:
solution = instance.get_feature(feature_id, feature_long_id,
feature_params=feature_params)
else:
solution = instance.get_feature(feature_id, feature_long_id,
overwrite=overwrite, **kwargs)
value = None
total_time = time.time() - start
total_time /= num_iterations
if solution is not None:
if type(solution) is dict:
if 'value' not in solution:
solution['value'] = None
for key in solution:
if key not in feature_dict:
feature_dict[key] = {}
feature_dict[key][instance_id] = solution[key]
else:
feature_dict['value'][instance_id] = solution
feature_dict['time'][instance_id] = total_time
else:
feature_dict['value'][instance_id] = value
feature_dict['time'][instance_id] = total_time
if saveas is None:
saveas = feature_long_id
if self.is_exported:
exports.export_feature_to_file(self,
feature_id=feature_id,
feature_dict=feature_dict,
saveas=saveas)
for election_id in self.elections:
self.instances[election_id].features[saveas] = feature_dict['value'][election_id]
self.features[saveas] = feature_dict
return feature_dict
[docs]
def compute_rules(
self,
list_of_rules: list,
committee_size: int = 10,
resolute: bool = False
) -> None:
"""
Computes the winning committees for a list of rules.
Parameters
----------
list_of_rules : list
A list of rules to compute the winning committees for.
committee_size : int
The size of the winning committees.
resolute : bool
Whether to compute the resolute committees.
Returns
-------
None
"""
for rule_name in list_of_rules:
print('Computing', rule_name)
rules.compute_abcvoting_rule(
experiment=self,
rule_name=rule_name,
committee_size=committee_size,
resolute=resolute)
[docs]
def import_committees(self, list_of_rules) -> None:
"""
Imports the winning committees for a list of rules.
Parameters
----------
list_of_rules : list
A list of rules to import the winning committees for.
Returns
-------
None
"""
for rule_name in list_of_rules:
self.all_winning_committees[rule_name] = rules.import_committees_from_file(
experiment_id=self.experiment_id, rule_name=rule_name)
[docs]
def add_election_to_family(self, election=None, family_id=None) -> None:
"""
Adds an election to a family.
Parameters
----------
election : Election
The election to add to the family.
family_id : str
The id of the family to add the election to.
Returns
-------
None
"""
election.instance_id = f'{self.families[family_id]}_{self.families[family_id].size}'
self.instances[election.instance_id] = election
self.families[family_id].add_election(election)
def prepare_election_features(self):
for election in self.instances.items():
election[1].election_features.votes = election[1].votes
election[1].election_features.num_candidates = election[1].num_candidates
election[1].election_features.num_voters = election[1].num_voters
election[1].election_features.calculate_all()
# def prepare_compass_dictionary(self):
# for election in self.instances.items():
# election[1].election_features.votes = election[1].votes
# election[1].election_features.num_candidates = election[1].num_candidates
# election[1].election_features.num_voters = election[1].num_voters
# election[1].election_features.compass_points['ST'] = self.instances[
# ST_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['AN'] = self.instances[
# AN_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['ID'] = self.instances[
# ID_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['UN'] = self.instances[
# UN_KEY + str(election[1].num_candidates)]
def calculate_dap(self, id):
dap = list()
dap.append(self.features['Diversity'][id])
dap.append(self.features['Agreement'][id])
dap.append(self.features['Polarization'][id])
return dap
def calculate_features_vector(self, id, features_list: list):
vector = list()
if 'd' in features_list:
vector.append(self.features['Diversity'][id])
if 'a' in features_list:
vector.append(self.features['Agreement'][id])
if 'p' in features_list:
vector.append(self.features['Polarization'][id])
if 'e' in features_list:
vector.append(self.features['Entropy'][id])
if 'e2' in features_list:
vector.append(self.features['Entropy'][id] * self.features['Entropy'][id])
if 'cds' in features_list:
vector.append(self.features['CandidateDistanceStd'][id])
return vector
def prepare_election_sizes(self):
for election in self.instances.items():
self.election_sizes.add(election[1].num_candidates)
def prepare_feature_vectors(self, features: list):
for election in self.instances.items():
election[1].election_features.votes = election[1].votes
election[1].election_features.num_candidates = election[1].num_candidates
election[1].election_features.num_voters = election[1].num_voters
election[1].election_features.features_vector = self.calculate_features_vector(
election[1].election_id,
features)
def prepare_instances(self):
return self.prepare_elections()
def add_instance(self):
return self.add_election()
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
def _check_if_all_equal(values, subject):
if any(x != values[0] for x in values):
text = f'Not all {subject} values are equal!'
warnings.warn(text)