Source code for mapof.roommates.features.basic_features

import statistics
import sys
from random import shuffle

import gurobipy as gp
import networkx as nx
from gurobipy import GRB
from matching.games import StableRoommates

sys.setrecursionlimit(10000)
# warnings.filterwarnings("error")

def generate_instance(num_agents):
    instance=[]
    for i in range(num_agents):
        pref = [x for x in range(num_agents) if x != i]
        shuffle(pref)
        instance.append(pref)
    return instance

def number_blocking_pairs(instance,matching):
    bps=0
    num_agents=len(instance)
    for i in range(num_agents):
        for j in range(i+1,num_agents):
            if i!=j:
                partner_i=matching[i]
                partneri_index=instance[i].index(partner_i)
                partner_j = matching[j]
                partnerj_index = instance[j].index(partner_j)
                if instance[i].index(j)<partneri_index:
                    if instance[j].index(i) < partnerj_index:
                        bps+=1
    return bps


def compute_stable_SR(votes):
    dict_instance={}
    num_agents=len(votes)
    for i in range(num_agents):
        dict_instance[i]=votes[i]
    game = StableRoommates.create_from_dictionary(dict_instance)
    try:
        matching = game.solve()
        usable_matching = {}
        for m in matching:
            usable_matching[m.name] = matching[m].name
        return usable_matching
    except:
        return 'None'


#Only call for instances that admit a stable matchig
#summed: Set to true we try to optimize the summed rank of agents for their partner in the matching, set to false we optimize the minimum rank
#Best (only relevant if summed is set to true): Set to true we output the best possible matching, set to false the worst one
def rank_matching(instance,best,summed):
    num_agents=len(instance)
    m = gp.Model("mip1")
    m.setParam('OutputFlag', False)
    m.setParam('Threads', 10)
    x = m.addVars(num_agents, num_agents, lb=0, ub=1, vtype=GRB.BINARY)
    opt = m.addVar(vtype=GRB.INTEGER, lb=0, ub=num_agents*num_agents)
    for i in range(num_agents):
        m.addConstr(x[i, i]  == 0)
    for i in range(num_agents):
        for j in range(num_agents):
            m.addConstr(x[i, j]  == x[j, i])
    for i in range(num_agents):
        m.addConstr(gp.quicksum(x[i, j] for j in range(num_agents)) == 1)
    for i in range(num_agents):
        for j in range(i+1,num_agents):
            better_pairs=[]
            for t in range(0,instance[i].index(j)+1):
                better_pairs.append([i,instance[i][t]])
            for t in range(0,instance[j].index(i)+1):
                better_pairs.append([j,instance[j][t]])
            m.addConstr(gp.quicksum(x[a[0], a[1]] for a in better_pairs) >= 1)
    if summed:
        #print(instance[0])
        m.addConstr(gp.quicksum(instance[i].index(j)* x[i, j]  for i in range(num_agents) for j in [x for x in range(num_agents) if x != i]) == opt)
        if best:
            m.setObjective(opt, GRB.MAXIMIZE)
        else:
            m.setObjective(opt, GRB.MINIMIZE)
    else:
        for i in range(num_agents):
            m.addConstr(gp.quicksum(instance[j].index(i) * x[i, j] for j in [x for x in range(num_agents) if x != i])<=opt)
        m.setObjective(opt, GRB.MINIMIZE)
    m.optimize()
    if m.status != GRB.Status.OPTIMAL:
        raise RuntimeError("Could not optimize model during roommates feature "
                           f"computation. Gurobi status {m.status} returned.")
    matching={}
    for i in range(num_agents):
        for j in range(num_agents):
            if abs(x[i, j].X - 1) < 0.05:
                matching[i]=j
                matching[j]=i
    return int(m.objVal), matching

def min_num_bps_matching(instance):
    num_agents=len(instance.votes)
    m = gp.Model("mip1")
    m.setParam('OutputFlag', False)
    x = m.addVars(num_agents, num_agents, lb=0, ub=1, vtype=GRB.BINARY)
    y = m.addVars(num_agents, num_agents, lb=0, ub=1, vtype=GRB.BINARY)
    opt = m.addVar(vtype=GRB.INTEGER, lb=0, ub=num_agents*num_agents)
    m.addConstr(gp.quicksum(y[i, j] for i in range(num_agents) for j in range(num_agents)) <= opt)
    for i in range(num_agents):
        m.addConstr(x[i, i]  == 0)
    for i in range(num_agents):
        for j in range(num_agents):
            m.addConstr(x[i, j]  == x[j, i])
    for i in range(num_agents):
        m.addConstr(gp.quicksum(x[i, j] for j in range(num_agents)) <= 1)
    for i in range(num_agents):
        for j in range(i+1,num_agents):
            better_pairs=[]
            for t in range(0,instance.votes[i].index(j)+1):
                better_pairs.append([i,instance.votes[i][t]])
            for t in range(0,instance.votes[j].index(i)+1):
                better_pairs.append([j,instance.votes[j][t]])
            m.addConstr(gp.quicksum(x[a[0], a[1]] for a in better_pairs) >= 1-y[i,j])

    m.setObjective(opt, GRB.MINIMIZE)
    m.optimize()
    matching={}
    for i in range(num_agents):
        for j in range(num_agents):
            if abs(x[i, j].X - 1) < 0.05:
                matching[i]=j
                matching[j]=i
    #return int(m.objVal), matching
    return int(m.objVal)

def summed_rank_maximal_matching(instance):
    val,matching= rank_matching(instance.votes,True,True)
    if number_blocking_pairs(instance.votes,matching)>0:
        print('ERROR IN summed_rank_maximal_matching')
        exit(0)
    return val


def summed_rank_minimal_matching(instance):
    val,matching= rank_matching(instance.votes,False,True)
    if number_blocking_pairs(instance.votes,matching)>0:
        print('ERROR IN summed_rank_minimal_matching')
        exit(0)
    return val

def minimal_rank_maximizing_matching(instance):
    val,matching=rank_matching(instance.votes,True,False)
    if number_blocking_pairs(instance.votes,matching)>0:
        print('ERROR IN minimal_rank_maximizing_matching')
        exit(0)
    return val

def avg_num_of_bps_for_random_matching(instance,iterations=100):
    bps=[]
    num_agents=len(instance.votes)
    for _ in range(iterations):
        #create random matching
        agents=list(range(num_agents))
        shuffle(agents)
        matching=[agents[i:i + 2] for i in range(0, num_agents, 2)]
        matching_dict={}
        for m in matching:
            matching_dict[m[0]]=m[1]
            matching_dict[m[1]] = m[0]
        bps.append(number_blocking_pairs(instance.votes,matching_dict))
    return statistics.mean(bps), statistics.stdev(bps)


def num_of_bps_maximumWeight(instance) -> int:
    num_agents=len(instance.votes)
    G = nx.Graph()
    for i in range(num_agents):
        for j in range(i+1,num_agents):
            G.add_edge(i,j,weight=2*(num_agents-1)-instance.votes[i].index(j)-instance.votes[j].index(i))
            #G.add_edge(i, j, weight=instance[i].index(j) + instance[j].index(i))
    matching = nx.max_weight_matching(G, maxcardinality=True)
    matching_dict = {}
    for p in matching:
        matching_dict[p[0]] = p[1]
        matching_dict[p[1]] = p[0]
    return number_blocking_pairs(instance.votes,matching_dict)


[docs] def mutuality(instance) -> int: """ Computes the mutuality score of an SR instance. """ vectors = instance.get_retrospective_vectors() score = 0 for vector in vectors: for i, v in enumerate(vector): score += abs(v-i) return score
def dist_from_id_1(instance) -> int: vectors = instance.get_retrospective_vectors() score = 0 for vector in vectors: for i in range(len(vector)): for j in range(i,len(vector)): score += abs(vector[j]-vector[i]) return score def dist_from_id_2(instance) -> int: vectors = instance.get_retrospective_vectors() score = 0 for vector in vectors: score += len(set(vector)) return score