Source code for mapof.roommates.objects.RoommatesExperiment

#!/usr/bin/env python
import ast
import csv
import logging
import time
from abc import ABC

from mapof.core.glossary import *
from mapof.core.objects.Experiment import Experiment
from mapof.core.persistence.experiment_imports import get_values_from_csv_file
from mapof.core.utils import *

import mapof.roommates.features as features
import mapof.roommates.features.basic_features as basic
from mapof.roommates.distances import get_distance
from mapof.roommates.objects.Roommates import Roommates
from mapof.roommates.objects.RoommatesFamily import RoommatesFamily

try:
    from sklearn.manifold import MDS
    from sklearn.manifold import TSNE
    from sklearn.manifold import SpectralEmbedding
    from sklearn.manifold import LocallyLinearEmbedding
    from sklearn.manifold import Isomap
except ImportError as error:
    MDS = None
    TSNE = None
    SpectralEmbedding = None
    LocallyLinearEmbedding = None
    Isomap = None
    print(error)


[docs] class RoommatesExperiment(Experiment, ABC): """Abstract set of elections.""" def __init__(self, **kwargs): super().__init__(**kwargs) self.default_num_agents = 10 self.matchings = {} try: self.import_matchings() except: pass def add_culture(self, name, function): pass def add_distance(self, name, function): pass def add_feature(self, name, function): pass def add_folders_to_experiment(self) -> None: dirs = ["experiments"] for dir in dirs: if not os.path.isdir(dir): os.mkdir(os.path.join(os.getcwd(), dir)) if not os.path.isdir(os.path.join(os.getcwd(), "experiments", self.experiment_id)): os.mkdir(os.path.join(os.getcwd(), "experiments", self.experiment_id)) list_of_folders = ['distances', 'features', 'coordinates', 'instances'] for folder_name in list_of_folders: if not os.path.isdir(os.path.join(os.getcwd(), "experiments", self.experiment_id, folder_name)): os.mkdir(os.path.join(os.getcwd(), "experiments", self.experiment_id, folder_name)) path = os.path.join(os.getcwd(), "experiments", self.experiment_id, "map.csv") if not os.path.exists(path): with open(path, 'w') as file_csv: file_csv.write( "size;num_agents;culture_id;params;color;alpha;" "family_id;label;marker\n") file_csv.write("3;10;ic;{};black;1;ic;Impartial Culture;o\n") def import_matchings(self): matchings = {} path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'features', 'stable_sr.csv') with open(path, 'r', newline='') as csv_file: reader = csv.DictReader(csv_file, delimiter=';') for row in reader: election_id = row['instance_id'] value = row['matching'] if value == '': matchings[election_id] = None else: matchings[election_id] = value self.matchings = matchings def add_instances_to_experiment(self): instances = {} for family_id in self.families: ids = [] if self.families[family_id].single: instance_id = family_id instance = Roommates(self.experiment_id, instance_id) instances[instance_id] = instance ids.append(str(instance_id)) else: for j in range(self.families[family_id].size): instance_id = family_id + '_' + str(j) instance = Roommates(self.experiment_id, instance_id) instances[instance_id] = instance ids.append(str(instance_id)) self.families[family_id].instance_ids = ids return instances def add_instance(self, culture_id="none", instance_id=None, params=None, label=None, color="black", alpha=1., show=True, marker='x', size=1, num_agents=None ): if num_agents is None: num_agents = self.default_num_agents return self.add_family( culture_id=culture_id, instance_id=instance_id, params=params, size=size, label=label, color=color, alpha=alpha, show=show, marker=marker, family_id=instance_id, num_agents=num_agents, single_instance=True ) def add_family(self, culture_id: str = "none", instance_id: str = None, params: dict = None, size: int = 1, label: str = None, color: str = "black", alpha: float = 1., show: bool = True, marker: str = 'o', family_id: str = None, single_instance: bool = False, num_agents: int = None, path: dict = None, **kwargs, ): if instance_id is not None: family_id = instance_id if num_agents is None: num_agents = self.default_num_agents if self.families is None: self.families = {} if params is None: params = {} if family_id is None: family_id = culture_id + '_' + str(num_agents) if label is None: label = family_id self.families[family_id] = RoommatesFamily( culture_id=culture_id, family_id=family_id, params=params, label=label, color=color, alpha=alpha, single=single_instance, show=show, size=size, marker=marker, num_agents=num_agents, path=path, **kwargs ) self.num_families = len(self.families) self.num_instances = sum([self.families[family_id].size for family_id in self.families]) new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id ) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] self.families[family_id].instance_ids = list(new_instances.keys()) # if self.is_exported: # self.update_map_csv() # To be implemented return list(new_instances.keys()) def set_default_num_agents(self, num_agents: int): self.default_num_agents = num_agents
[docs] def get_distance(self, election_1: Roommates, election_2: Roommates, distance_id: str = None, **kwargs ) -> float or (float, list): return get_distance(election_1, election_2, distance_id)
[docs] def import_controllers(self): """ Import controllers from a file """ families = {} path = os.path.join(os.getcwd(), 'experiments', self.experiment_id, 'map.csv') file_ = open(path, 'r') header = [h.strip() for h in file_.readline().split(';')] reader = csv.DictReader(file_, fieldnames=header, delimiter=';') starting_from = 0 for row in reader: culture_id = None color = None label = None params = None alpha = None size = None marker = None num_agents = None family_id = None show = True if 'culture_id' in row.keys(): culture_id = str(row['culture_id']).strip() if 'color' in row.keys(): color = str(row['color']).strip() if 'label' in row.keys(): label = str(row['label']) if 'family_id' in row.keys(): family_id = str(row['family_id']) if 'params' in row.keys(): params = ast.literal_eval(str(row['params'])) if 'alpha' in row.keys(): alpha = float(row['alpha']) if 'size' in row.keys(): size = int(row['size']) if 'marker' in row.keys(): marker = str(row['marker']).strip() if 'num_agents' in row.keys(): num_agents = int(row['num_agents']) if 'path' in row.keys(): path = ast.literal_eval(str(row['path'])) if 'show' in row.keys(): show = row['show'].strip() == 'process_id' single_instance = size == 1 families[family_id] = RoommatesFamily(culture_id=culture_id, family_id=family_id, params=params, label=label, color=color, alpha=alpha, show=show, size=size, marker=marker, starting_from=starting_from, num_agents=num_agents, path=path, single=single_instance) starting_from += size self.num_families = len(families) self.num_instances = sum([families[family_id].size for family_id in families]) self.main_order = [i for i in range(self.num_instances)] file_.close() return families
def prepare_instances(self): if self.instances is None: self.instances = {} for family_id in self.families: new_instances = self.families[family_id].prepare_family( is_exported=self.is_exported, experiment_id=self.experiment_id) for instance_id in new_instances: self.instances[instance_id] = new_instances[instance_id] def compute_stable_sr(self): for instance_id in self.instances: if instance_id in ['roommates_test']: self.matchings[instance_id] = 'None' else: usable_matching = basic.compute_stable_SR(self.instances[instance_id].votes) self.matchings[instance_id] = usable_matching if self.is_exported: path_to_folder = os.path.join(os.getcwd(), "experiments", self.experiment_id, "features") make_folder_if_do_not_exist(path_to_folder) path_to_file = os.path.join(path_to_folder, f'stable_sr.csv') with open(path_to_file, 'w', newline='') as csv_file: writer = csv.writer(csv_file, delimiter=';') writer.writerow( ["instance_id", "matching"]) for instance_id in self.instances: usable_matching = self.matchings[instance_id] writer.writerow([instance_id, usable_matching]) def compute_feature(self, feature_id: str = None, feature_params=None) -> dict: if feature_params is None: feature_params = {} feature_dict = {'value': {}, 'time': {}, 'std': {}} features_with_std = {'avg_num_of_bps_for_rand_matching'} num_iterations = 1 if 'num_iterations' in feature_params: num_iterations = feature_params['num_iterations'] if feature_id in MAIN_GLOBAL_FEATUERS: feature = features.get_global_feature(feature_id) values = feature(self, election_ids=list(self.instances)) for instance_id in self.instances: feature_dict['value'][instance_id] = values[instance_id] feature_dict['time'][instance_id] = 0 elif feature_id in ['distortion_from_all', 'max_distortion_from_all']: feature = features.get_global_feature(feature_id) for instance_id in self.instances: values = feature(self, self.instances[instance_id]) feature_dict['value'][instance_id] = values feature_dict['time'][instance_id] = 0 else: if feature_id == 'summed_rank_difference': minimal = get_values_from_csv_file(self, feature_id='summed_rank_minimal_matching') maximal = get_values_from_csv_file(self, feature_id='summed_rank_maximal_matching') for instance_id in self.instances: if minimal[instance_id] is None: value = 'None' else: value = abs(maximal[instance_id] - minimal[instance_id]) feature_dict['value'][instance_id] = value feature_dict['time'][instance_id] = 0 else: for instance_id in self.instances: feature = features.get_local_feature(feature_id) instance = self.instances[instance_id] start = time.time() for _ in range(num_iterations): if feature_id in ['summed_rank_minimal_matching', 'summed_rank_maximal_matching', 'minimal_rank_maximizing_matching', ] \ and (self.matchings[instance_id] is None or self.matchings[ instance_id] == 'None'): value = 'None' else: value = feature(instance) total_time = time.time() - start total_time /= num_iterations if feature_id in features_with_std: feature_dict['value'][instance_id] = value[0] feature_dict['time'][instance_id] = total_time feature_dict['std'][instance_id] = value[1] else: feature_dict['value'][instance_id] = value feature_dict['time'][instance_id] = total_time if self.is_exported: path_to_folder = os.path.join(os.getcwd(), "experiments", self.experiment_id, "features") make_folder_if_do_not_exist(path_to_folder) path_to_file = os.path.join(path_to_folder, f'{feature_id}.csv') with open(path_to_file, 'w', newline='') as csv_file: writer = csv.writer(csv_file, delimiter=';') if feature_id in features_with_std: writer.writerow(["election_id", "value", 'time', 'std']) for key in feature_dict['value']: writer.writerow([key, feature_dict['value'][key], round(feature_dict['time'][key], 3), round(feature_dict['std'][key], 3)]) else: writer.writerow(["election_id", "value", 'time']) for key in feature_dict['value']: writer.writerow([key, feature_dict['value'][key], round(feature_dict['time'][key], 3)]) self.features[feature_id] = feature_dict return feature_dict def create_structure(self) -> None: dirs = ["experiments", "images", "trash"] for dir in dirs: if not os.path.isdir(dir): os.mkdir(os.path.join(os.getcwd(), dir)) try: os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id)) os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id, "distances")) os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id, "features")) os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id, "coordinates")) os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id, "instances")) os.mkdir(os.path.join(os.getcwd(), "election", self.experiment_id, "matrices")) # PREPARE MAP.CSV FILE path = os.path.join(os.getcwd(), "election", self.experiment_id, "map.csv") with open(path, 'w') as file_csv: file_csv.write( "size;num_agents;culture_id;params;color;alpha;family_id;label;marker;show\n") file_csv.write("10;20;roommates_ic;{};black;1;IC;IC;o;process_id\n") except: pass