Source code for mapof.marriages.objects.MarriagesExperiment

import ast
import csv
import os
from abc import ABC

from mapof.core.objects.Experiment import Experiment
from mapof.core.persistence.experiment_imports import get_values_from_csv_file
from mapof.core.utils import make_folder_if_do_not_exist
from tqdm import tqdm

import mapof.marriages.features as features
from mapof.marriages.distances import get_distance
from mapof.marriages.objects.Marriages import Marriages
from mapof.marriages.objects.MarriagesFamily import MarriagesFamily

try:
    from sklearn.manifold import MDS
    from sklearn.manifold import TSNE
    from sklearn.manifold import SpectralEmbedding
    from sklearn.manifold import LocallyLinearEmbedding
    from sklearn.manifold import Isomap
except ImportError as error:
    MDS = None
    TSNE = None
    SpectralEmbedding = None
    LocallyLinearEmbedding = None
    Isomap = None
    print(error)


[docs] class MarriagesExperiment(Experiment, ABC): """Abstract set of elections.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.default_num_agents = 10 self.matchings = {} def add_culture(self, name, function): pass def add_distance(self, name, function): pass def add_feature(self, name, function): pass def add_family(self, culture_id: str = "none", instance_id: str = None, params: dict = None, size: int = 1, label: str = None, color: str = "black", alpha: float = 1., show: bool = True, marker: str = 'o', family_id: str = None, single_instance: bool = False, num_agents: int = None ): if instance_id is not None: family_id = instance_id if num_agents is None: num_agents = self.default_num_agents if self.families is None: self.families = {} if params is None: params = {} if family_id is None: family_id = culture_id + '_' + str(num_agents) if label is None: label = family_id self.families[family_id] = MarriagesFamily( culture_id=culture_id, family_id=family_id, params=params, label=label, color=color, alpha=alpha, single=single_instance, show=show, size=size, marker=marker, num_agents=num_agents, ) self.num_families = len(self.families) self.num_instances = sum([self.families[family_id].size for family_id in self.families]) new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] self.families[family_id].instance_ids = list(new_instances.keys()) # if self.is_exported: # self.update_map_csv() # To be implemented return list(new_instances.keys()) def add_instance(self, culture_id="none", params=None, label=None, color="black", alpha=1., show=True, marker='x', starting_from=0, size=1, num_agents=None, instance_id=None): if num_agents is None: num_agents = self.default_num_agents return self.add_family( culture_id=culture_id, instance_id=instance_id, params=params, size=size, label=label, color=color, alpha=alpha, show=show, marker=marker, family_id=instance_id, num_agents=num_agents, single_instance=True, ) def add_instances_to_experiment(self): instances = {} for family_id in self.families: ids = [] if self.families[family_id].single: instance_id = family_id instance = Marriages(self.experiment_id, instance_id) instances[instance_id] = instance ids.append(str(instance_id)) else: for j in range(self.families[family_id].size): instance_id = family_id + '_' + str(j) instance = Marriages(self.experiment_id, instance_id) instances[instance_id] = instance ids.append(str(instance_id)) self.families[family_id].instance_ids = ids return instances
[docs] def get_distance(self, election_1, election_2, distance_id: str = None, **kwargs ) -> float or (float, list): return get_distance(election_1, election_2, distance_id)
[docs] def import_controllers(self): """ Import controllers from a file """ families = {} path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv') file_ = open(path, 'r') header = [h.strip() for h in file_.readline().split(';')] reader = csv.DictReader(file_, fieldnames=header, delimiter=';') starting_from = 0 for row in reader: culture_id = None color = None label = None params = None alpha = None size = None marker = None num_agents = None family_id = None show = True if 'culture_id' in row.keys(): culture_id = str(row['culture_id']).strip() if 'color' in row.keys(): color = str(row['color']).strip() if 'label' in row.keys(): label = str(row['label']) if 'family_id' in row.keys(): family_id = str(row['family_id']) if 'params' in row.keys(): params = ast.literal_eval(str(row['params'])) if 'alpha' in row.keys(): alpha = float(row['alpha']) if 'size' in row.keys(): size = int(row['size']) if 'marker' in row.keys(): marker = str(row['marker']).strip() if 'num_agents' in row.keys(): num_agents = int(row['num_agents']) if 'show' in row.keys(): show = row['show'].strip() == 'process_id' single_instance = size == 1 families[family_id] = MarriagesFamily(culture_id=culture_id, family_id=family_id, params=params, label=label, color=color, alpha=alpha, show=show, size=size, marker=marker, starting_from=starting_from, num_agents=num_agents, single=single_instance) starting_from += size self.num_families = len(families) self.num_instances = sum([families[family_id].size for family_id in families]) self.main_order = [i for i in range(self.num_instances)] file_.close() return families
def add_folders_to_experiment(self) -> None: dirs = ["experiments"] for dir in dirs: if not os.path.isdir(dir): os.mkdir(os.path.join(os.getcwd(), dir)) if not os.path.isdir(os.path.join(os.getcwd(), "experiments", self.experiment_id)): os.mkdir(os.path.join(os.getcwd(), "experiments", self.experiment_id)) list_of_folders = ['distances', 'features', 'coordinates', 'instances'] for folder_name in list_of_folders: if not os.path.isdir(os.path.join(os.getcwd(), "experiments", self.experiment_id, folder_name)): os.mkdir(os.path.join(os.getcwd(), "experiments", self.experiment_id, folder_name)) path = os.path.join(os.getcwd(), "experiments", self.experiment_id, "map.csv") if not os.path.exists(path): with open(path, 'w') as file_csv: file_csv.write( "size;num_agents;culture_id;params;color;alpha;" "family_id;label;marker\n") file_csv.write("3;10;ic;{};black;1;ic;Impartial Culture;o\n") def prepare_instances(self): if self.instances is None: self.instances = {} for family_id in tqdm(self.families, desc="Preparing instances"): new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] def compute_feature(self, feature_id: str = None, feature_params=None) -> dict: if feature_params is None: feature_params = {} feature_dict = {'value': {}, 'time': {}, 'std': {}} features_with_time = {} features_with_std = {'avg_num_of_bps_for_rand_matching', 'avg_number_of_bps_for_random_matching'} if feature_id == 'summed_rank_difference': minimal = get_values_from_csv_file(self, feature_id='summed_rank_minimal_matching') maximal = get_values_from_csv_file(self, feature_id='summed_rank_maximal_matching') for instance_id in self.instances: if minimal[instance_id] is None: value = 'None' else: value = abs(maximal[instance_id] - minimal[instance_id]) feature_dict['value'][instance_id] = value feature_dict['time'][instance_id] = 0 else: for instance_id in self.instances: feature = features.get_feature(feature_id) instance = self.instances[instance_id] value = feature(instance.votes) if feature_id in features_with_time: feature_dict['value'][instance_id] = value[0] feature_dict['time'][instance_id] = value[1] elif feature_id in features_with_std: feature_dict['value'][instance_id] = value[0] feature_dict['std'][instance_id] = value[1] else: feature_dict['value'][instance_id] = value if self.is_exported: path_to_folder = os.path.join(os.getcwd(), "experiments", self.experiment_id, "features") make_folder_if_do_not_exist(path_to_folder) path_to_file = os.path.join(path_to_folder, f'{feature_id}.csv') with open(path_to_file, 'w', newline='') as csv_file: writer = csv.writer(csv_file, delimiter=';') if feature_id in features_with_time: writer.writerow(["instance_id", "value", 'time']) for key in feature_dict['value']: writer.writerow( [key, feature_dict['value'][key], round(feature_dict['time'][key], 3)]) elif feature_id in features_with_std: writer.writerow(["instance_id", "value", 'std']) for key in feature_dict['value']: writer.writerow( [key, feature_dict['value'][key], round(feature_dict['std'][key], 3)]) else: writer.writerow(["instance_id", "value"]) for key in feature_dict['value']: writer.writerow([key, feature_dict['value'][key]]) self.features[feature_id] = feature_dict return feature_dict