import ast
import csv
import logging
import os
import time
import warnings
from abc import ABCMeta, abstractmethod
import mapof.core.persistence.experiment_exports as exports
import mapof.core.printing as pr
from mapof.core.features.register import (
registered_experiment_features,
features_embedding_related
)
from mapof.core.objects.Experiment import Experiment
from mapof.core.utils import get_instance_id
from tqdm import tqdm
import mapof.elections.other.approval_rules as rules
from mapof.elections.cultures.register import (
registered_ordinal_election_cultures,
registered_pseudo_ordinal_cultures,
registered_approval_election_cultures
)
from mapof.elections.distances import get_distance
from mapof.elections.features.register import (
features_with_params,
features_rule_related
)
from mapof.elections.objects.ApprovalElection import ApprovalElection
from mapof.elections.objects.ElectionFamily import ElectionFamily
from mapof.elections.objects.OrdinalElection import OrdinalElection
try:
from sklearn.manifold import MDS
from sklearn.manifold import TSNE
from sklearn.manifold import SpectralEmbedding
from sklearn.manifold import LocallyLinearEmbedding
from sklearn.manifold import Isomap
except ImportError as error:
MDS = None
TSNE = None
SpectralEmbedding = None
LocallyLinearEmbedding = None
Isomap = None
print(error)
[docs]
class ElectionExperiment(Experiment):
__metaclass__ = ABCMeta
"""Abstract set of instances."""
@abstractmethod
def add_folders_to_experiment(self):
pass
@abstractmethod
def add_feature(self, name, function):
pass
@abstractmethod
def add_culture(self, name, function):
pass
def __init__(self, is_shifted=False, **kwargs):
self.is_shifted = is_shifted
self.default_num_candidates = 10
self.default_num_voters = 100
self.default_committee_size = 1
self.all_winning_committees = {}
super().__init__(**kwargs)
def __getattr__(self, attr):
if attr == 'elections':
return self.instances
elif attr == 'num_elections':
return self.num_instances
else:
return self.__dict__[attr]
def __setattr__(self, name, value):
if name == "elections":
self.instances = value
elif name == "num_elections":
self.num_instances = value
else:
self.__dict__[name] = value
[docs]
def add_instances_to_experiment(self) -> None:
"""
Add instances to the experiment.
Returns:
None
"""
instances = {}
for family_id in self.families:
single = self.families[family_id].single
ids = []
for j in range(self.families[family_id].size):
instance_id = get_instance_id(single, family_id, j)
if self.instance_type == 'ordinal':
instance = OrdinalElection(self.experiment_id, instance_id,
is_imported=True,
fast_import=self.fast_import,
with_matrix=self.with_matrix,
label=self.families[family_id].label)
elif self.instance_type == 'approval':
instance = ApprovalElection(self.experiment_id, instance_id,
is_imported=True,
fast_import=self.fast_import,
label=self.families[family_id].label)
else:
instance = None
instances[instance_id] = instance
ids.append(str(instance_id))
self.families[family_id].election_ids = ids
return instances
[docs]
def set_default_num_candidates(self, num_candidates: int) -> None:
"""
Sets default number of candidates
Parameters
----------
num_candidates : int
Number of candidates.
Returns
-------
None
"""
self.default_num_candidates = num_candidates
[docs]
def set_default_num_voters(self, num_voters: int) -> None:
"""
Sets default number of voters
Parameters
----------
num_voters : int
Number of voters.
Returns
-------
None
"""
self.default_num_voters = num_voters
[docs]
def set_default_committee_size(self, committee_size: int) -> None:
"""
Sets default size of the committee
Parameters
----------
committee_size : int
Committee size.
Returns
-------
None
"""
self.default_committee_size = committee_size
def add_election_from_matrix(self,
frequency_matrix,
**kwargs):
if len(frequency_matrix) != len(frequency_matrix[0]):
raise ValueError("Matrix is not square")
self.add_election(
num_candidates=len(frequency_matrix),
frequency_matrix=frequency_matrix,
culture_id='frequency_matrix',
**kwargs
)
[docs]
def add_election(self,
culture_id="none",
label=None,
color="black",
alpha: float = 1.,
show: bool = True,
marker='x',
ms=20,
starting_from: int = 0,
size: int = 1,
num_candidates: int = None,
num_voters: int = None,
election_id: str = None, # deprecated
instance_id: str = None,
frequency_matrix=None,
is_temporary: bool = False,
params: dict = None,
):
"""
Adds election to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color
Color.
alpha : float
Alpha.
show : bool
If true the election is shown.
marker : str
Marker.
ms : int
Marker size.
starting_from : int
Starting index.
size : int
Size.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
election_id : str
Election id.
instance_id : str
Instance id.
frequency_matrix : list
Frequency matrix.
is_temporary : bool
If true the election is temporary.
params : dict
Model parameters.
Returns
-------
list
List of IDs of added instances.
"""
if instance_id is None:
instance_id = election_id
if num_candidates is None:
num_candidates = self.default_num_candidates
if num_voters is None:
num_voters = self.default_num_voters
return self.add_family(culture_id=culture_id,
size=size,
label=label,
color=color,
alpha=alpha,
show=show,
marker=marker,
ms=ms,
starting_from=starting_from,
family_id=instance_id,
num_candidates=num_candidates,
num_voters=num_voters,
frequency_matrix=frequency_matrix,
is_temporary=is_temporary,
single=True,
params=params)
[docs]
def add_family(self,
culture_id: str = "none",
size: int = 1,
label: str = None,
color: str = "black",
alpha: float = 1.,
show: bool = True,
marker: str = 'o',
ms: int = 20,
starting_from: int = 0,
num_candidates: int = None,
num_voters: int = None,
frequency_matrix=None,
is_temporary: bool = False,
family_id: str = None,
single: bool = False,
path: dict = None,
params: dict = None,
) -> list:
"""
Adds family of elections to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color
Color.
alpha : float
Alpha (i.e., transparency).
show : bool
If true the family is shown.
marker : str
Marker.
ms : int
Marker size.
starting_from : int
Starting index.
size : int
Size.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
frequency_matrix : list
Frequency matrix.
is_temporary : bool
If true the election is temporary.
family_id : str
Family id.
single : bool
If true only one election is added.
path : dict
Path.
params : dict
Model parameters.
Returns
-------
list
List of IDs of added instances.
"""
if num_candidates is None:
num_candidates = self.default_num_candidates
if num_voters is None:
num_voters = self.default_num_voters
if self.families is None:
self.families = {}
if params is None:
params = {}
if family_id is None:
family_id = culture_id + '_' + str(num_candidates) + '_' + str(num_voters)
if culture_id in {'urn'} and params.get('alpha') is not None:
family_id += '_' + str(float(params['alpha']))
elif culture_id in {'mallows'} and params.get('phi') is not None:
family_id += '_' + str(float(params['phi']))
elif culture_id in {'norm-mallows', 'norm-mallows_matrix'} \
and params.get('normphi') is not None:
family_id += '_' + str(float(params['normphi']))
elif culture_id in {'euclidean'} and params.get('dim') is not None \
and params.get('space') is not None:
family_id += '_' + str(int(params['dim'])) + '_' + str(params['space'])
elif label is None:
label = family_id
self.families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
params=params,
label=label,
color=color,
alpha=alpha,
show=show,
size=size,
marker=marker,
ms=ms,
starting_from=starting_from,
num_candidates=num_candidates,
num_voters=num_voters,
path=path,
single=single,
instance_type=self.instance_type,
frequency_matrix=frequency_matrix,
is_temporary=is_temporary,
)
self.num_families = len(self.families)
self.num_elections = sum([self.families[family_id].size for family_id in self.families])
self.main_order = [i for i in range(self.num_elections)]
new_instances = self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id,
instance_type=self.instance_type)
for instance_id in new_instances:
self.instances[instance_id] = new_instances[instance_id]
self.families[family_id].instance_ids = list(new_instances.keys())
if self.is_exported and not is_temporary:
self._update_map_csv()
return list(new_instances.keys())
[docs]
def add_existing_family_from_dir(
self,
dir: str = None,
culture_id: str = None,
**kwargs
) -> list:
"""
Adds family of elections to the experiment from the given directory.
Parameters
----------
dir : str
Directory.
culture_id : str
Culture id.
**kwargs : dict
Additional parameters.
Returns
-------
list
List of the families.
"""
if dir is None:
logging.warning('dir not specified')
if culture_id is None:
logging.warning('pseudo_culture_id not specified')
# Copy instances from dir to /elections
directory_in = dir
directory_out = os.path.join(os.getcwd(),
"experiments",
self.experiment_id,
'elections')
# List all files in the given directory
files = [f for f in os.listdir(directory_in) if
os.path.isfile(os.path.join(directory_in, f))]
# Sort the files for consistency
files.sort()
# Rename each file
for idx, file_name in enumerate(files):
# Create the new file name
# new_file_name = f"{pseudo_culture_id}_{idx}{os.path.splitext(file_name)[1]}"
new_file_name = f"{culture_id}_{idx}.soc"
# Form the full old and new paths
old_path = os.path.join(directory_in, file_name)
new_path = os.path.join(directory_out, new_file_name)
# Rename the file
os.rename(old_path, new_path)
print(f"Renaming: {file_name} -> {new_file_name}")
return self.add_family(culture_id=culture_id, **kwargs)
def _update_map_csv(self):
families = {}
path_to_file = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv')
with open(path_to_file, 'w', newline='') as csv_file:
writer = csv.writer(csv_file, delimiter=';')
all_fields = ['size',
'num_candidates',
'num_voters',
'culture_id',
'params',
'family_id',
'label',
'color',
'alpha',
'marker',
'ms',
'path'
]
writer.writerow(all_fields)
for family in self.families.values():
if not family.is_temporary:
all_values = [family.size,
family.num_candidates,
family.num_voters,
family.culture_id,
family.params,
family.family_id,
family.label,
family.color,
family.alpha,
family.marker,
family.ms,
family.path]
writer.writerow(all_values)
return families
[docs]
def add_empty_family(
self,
culture_id: str = "none",
label: str = None,
color: str = "black",
alpha: float = 1.,
show: bool = True,
marker: str = 'o',
num_candidates: int = None,
num_voters: int = None,
family_id: str = None
):
"""
Adds an empty family of elections to the experiment.
Parameters
----------
culture_id : str
Culture id.
label : str
Label.
color : str
Color.
alpha : float
Alpha.
show : bool
If true the family is shown.
marker : str
Marker.
num_candidates : int
Number of candidates.
num_voters : int
Number of voters.
family_id : str
Family id.
"""
if label is None:
label = family_id
self.families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
label=label,
color=color,
alpha=alpha,
show=show,
size=0,
marker=marker,
num_candidates=num_candidates,
num_voters=num_voters,
instance_type=self.instance_type)
self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id)
return self.families[family_id]
[docs]
def prepare_elections(self, export_points=False, is_aggregated=True) -> None:
"""
Prepares elections for a given experiment.
Parameters
----------
export_points : bool
Whether to store points in the instance.
is_aggregated : bool
Whether to aggregate the instances.
Returns
-------
None
"""
self.export_points = export_points
self.is_aggregated = is_aggregated
if self.instances is None:
self.instances = {}
for family_id in tqdm(self.families, desc="Preparing families"):
if self.instance_type == 'ordinal' and \
self.families[family_id].culture_id not in registered_pseudo_ordinal_cultures and \
self.families[family_id].culture_id not in registered_ordinal_election_cultures:
logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, '
f'since no such ORDINAL culture was found.')
continue
if self.instance_type == 'approval' and \
self.families[family_id].culture_id not in registered_approval_election_cultures:
logging.warning(f'Culture {self.families[family_id].culture_id} is skipped, '
f'since no such APPROVAL culture was found.')
continue
new_instances = self.families[family_id].prepare_family(
is_exported=self.is_exported,
experiment_id=self.experiment_id,
export_points=export_points,
is_aggregated=is_aggregated,
instance_type=self.instance_type,
)
for instance_id in new_instances:
self.instances[instance_id] = new_instances[instance_id]
[docs]
def compute_voting_rule(self, method=None, committee_size=1) -> None:
"""
Computes voting rule for all elections in the experiment.
Parameters
----------
method : str
Name of the boting rule to be computed.
committee_size : int
Size of the committee.
Returns
-------
None
"""
for election_id in self.elections:
self.elections[election_id].compute_voting_rule(
method=method, committee_size=committee_size)
def compute_alternative_winners(self, method=None, committee_size=None, num_parties=None):
for election_id in self.elections:
for party_id in range(num_parties):
self.elections[election_id].compute_alternative_winners(
method=method, party_id=party_id, committee_size=committee_size)
def get_distance(
self,
election_1,
election_2,
distance_id: str = None,
**kwargs
) -> float or (float, list):
return get_distance(election_1, election_2, distance_id)
def print_matrix(self, **kwargs):
pr.print_matrix(experiment=self, **kwargs)
[docs]
def import_controllers(self) -> list:
"""
Import controllers from a file
Returns
-------
list
List of families.
"""
families = {}
path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv')
with open(path, 'r') as file_:
header = [h.strip() for h in file_.readline().split(';')]
reader = csv.DictReader(file_, fieldnames=header, delimiter=';')
all_num_candidates = []
all_num_voters = []
starting_from = 0
for row in reader:
culture_id = None
params = None
size = None
num_candidates = None
num_voters = None
family_id = None
print_params = {}
if 'culture_id' in row.keys():
culture_id = str(row['culture_id']).strip()
if 'family_id' in row.keys():
family_id = str(row['family_id'])
if 'params' in row.keys():
params = ast.literal_eval(str(row['params']))
if 'size' in row.keys():
size = int(row['size'])
if 'num_candidates' in row.keys():
num_candidates = int(row['num_candidates'])
if 'num_voters' in row.keys():
num_voters = int(row['num_voters'])
if 'path' in row.keys():
path = ast.literal_eval(str(row['path']))
if 'label' in row.keys():
print_params['label'] = str(row['label'])
if 'alpha' in row.keys():
print_params['alpha'] = float(row['alpha'])
if 'marker' in row.keys():
print_params['marker'] = str(row['marker']).strip()
if 'ms' in row.keys():
print_params['ms'] = int(row['ms'])
if 'color' in row.keys():
print_params['color'] = str(row['color']).strip()
single = size == 1
families[family_id] = ElectionFamily(culture_id=culture_id,
family_id=family_id,
params=params,
size=size,
starting_from=starting_from,
num_candidates=num_candidates,
num_voters=num_voters,
path=path,
single=single,
**print_params
)
starting_from += size
all_num_candidates.append(num_candidates)
all_num_voters.append(num_voters)
_check_if_all_equal(all_num_candidates, 'num_candidates')
_check_if_all_equal(all_num_voters, 'num_voters')
self.num_families = len(families)
self.num_elections = sum([families[family_id].size for family_id in families])
self.main_order = [i for i in range(self.num_elections)]
return families
[docs]
def compute_feature(
self,
feature_id: str = None,
feature_params: dict = None,
overwrite: bool = False,
saveas: str = None,
**kwargs
) -> dict:
"""
Computes a feature for each in instances in the experiment.
Parameters
----------
feature_id : str
The id of the feature to compute.
feature_params : dict
The parameters of the feature to compute.
overwrite : bool
Whether to overwrite the feature if it already exists.
saveas : str
Returns
-------
dict
A dictionary with the computed feature for each instance.
"""
if feature_params is None:
feature_params = {}
if feature_id in features_rule_related:
feature_long_id = f'{feature_id}_{feature_params["rule"]}'
elif feature_id in features_embedding_related:
feature_long_id = f'{feature_id}_{self.embedding_id}'
else:
feature_long_id = feature_id
num_iterations = feature_params.get('num_iterations', 1)
feature_dict = {'value': {}, 'time': {}}
if feature_id in registered_experiment_features:
feature = registered_experiment_features[feature_id]
solution_dict = feature(self, election_ids=list(self.instances), **kwargs)
for instance_id in tqdm(self.instances, desc='Computing experiment feature'):
for key in solution_dict[instance_id]:
if key not in feature_dict:
feature_dict[key] = {}
feature_dict['value'][instance_id] = solution_dict[instance_id]
if solution_dict[instance_id] is None:
feature_dict['time'][instance_id] = None
else:
feature_dict['time'][instance_id] = 0
else:
for instance_id in tqdm(self.instances, desc=f"{feature_long_id}"):
instance = self.elections[instance_id]
start = time.time()
solution = None
for _ in range(num_iterations):
if feature_id in features_with_params:
solution = instance.get_feature(feature_id, feature_long_id,
feature_params=feature_params)
else:
solution = instance.get_feature(feature_id, feature_long_id,
overwrite=overwrite, **kwargs)
value = None
total_time = time.time() - start
total_time /= num_iterations
if solution is not None:
if type(solution) is dict:
if 'value' not in solution:
solution['value'] = None
for key in solution:
if key not in feature_dict:
feature_dict[key] = {}
feature_dict[key][instance_id] = solution[key]
else:
feature_dict['value'][instance_id] = solution
feature_dict['time'][instance_id] = total_time
else:
feature_dict['value'][instance_id] = value
feature_dict['time'][instance_id] = total_time
if saveas is None:
saveas = feature_long_id
if self.is_exported:
exports.export_feature_to_file(self,
feature_id=feature_id,
feature_dict=feature_dict,
saveas=saveas)
for election_id in self.elections:
self.instances[election_id].features[saveas] = feature_dict['value'][election_id]
self.features[saveas] = feature_dict
return feature_dict
[docs]
def compute_rules(
self,
list_of_rules: list,
committee_size: int = 10,
resolute: bool = False
) -> None:
"""
Computes the winning committees for a list of rules.
Parameters
----------
list_of_rules : list
A list of rules to compute the winning committees for.
committee_size : int
The size of the winning committees.
resolute : bool
Whether to compute the resolute committees.
Returns
-------
None
"""
for rule_name in list_of_rules:
print('Computing', rule_name)
rules.compute_abcvoting_rule(
experiment=self,
rule_name=rule_name,
committee_size=committee_size,
resolute=resolute)
[docs]
def import_committees(self, list_of_rules) -> None:
"""
Imports the winning committees for a list of rules.
Parameters
----------
list_of_rules : list
A list of rules to import the winning committees for.
Returns
-------
None
"""
for rule_name in list_of_rules:
self.all_winning_committees[rule_name] = rules.import_committees_from_file(
experiment_id=self.experiment_id, rule_name=rule_name)
[docs]
def add_election_to_family(self, election=None, family_id=None) -> None:
"""
Adds an election to a family.
Parameters
----------
election : Election
The election to add to the family.
family_id : str
The id of the family to add the election to.
Returns
-------
None
"""
election.instance_id = f'{self.families[family_id]}_{self.families[family_id].size}'
self.instances[election.instance_id] = election
self.families[family_id].add_election(election)
def prepare_election_features(self):
for election in self.instances.items():
election[1].election_features.votes = election[1].votes
election[1].election_features.num_candidates = election[1].num_candidates
election[1].election_features.num_voters = election[1].num_voters
election[1].election_features.calculate_all()
# def prepare_compass_dictionary(self):
# for election in self.instances.items():
# election[1].election_features.votes = election[1].votes
# election[1].election_features.num_candidates = election[1].num_candidates
# election[1].election_features.num_voters = election[1].num_voters
# election[1].election_features.compass_points['ST'] = self.instances[
# ST_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['AN'] = self.instances[
# AN_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['ID'] = self.instances[
# ID_KEY + str(election[1].num_candidates)]
# election[1].election_features.compass_points['UN'] = self.instances[
# UN_KEY + str(election[1].num_candidates)]
def calculate_dap(self, id):
dap = list()
dap.append(self.features['Diversity'][id])
dap.append(self.features['Agreement'][id])
dap.append(self.features['Polarization'][id])
return dap
def calculate_features_vector(self, id, features_list: list):
vector = list()
if 'd' in features_list:
vector.append(self.features['Diversity'][id])
if 'a' in features_list:
vector.append(self.features['Agreement'][id])
if 'p' in features_list:
vector.append(self.features['Polarization'][id])
if 'e' in features_list:
vector.append(self.features['Entropy'][id])
if 'e2' in features_list:
vector.append(self.features['Entropy'][id] * self.features['Entropy'][id])
if 'cds' in features_list:
vector.append(self.features['CandidateDistanceStd'][id])
return vector
def prepare_election_sizes(self):
for election in self.instances.items():
self.election_sizes.add(election[1].num_candidates)
def prepare_feature_vectors(self, features: list):
for election in self.instances.items():
election[1].election_features.votes = election[1].votes
election[1].election_features.num_candidates = election[1].num_candidates
election[1].election_features.num_voters = election[1].num_voters
election[1].election_features.features_vector = self.calculate_features_vector(
election[1].election_id,
features)
def prepare_instances(self):
return self.prepare_elections()
def add_instance(self):
return self.add_election()
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)
def _check_if_all_equal(values, subject):
if any(x != values[0] for x in values):
text = f'Not all {subject} values are equal!'
warnings.warn(text)