"""The main Artificial Bee Colony optimization algorithm.
"""
import sys
sys.path.append('...')
import os
import time
import pandas as pd
import numpy as np
from .employee_bee import EmployeeBee
from .onlooker_bee import OnlookerBee
from .scout_bee import ScoutBee
from config import Params
from utils import Logger
from utils import FileHandler
[docs]class ArtificialBeeColony:
'''Artificial Bee Colony optimizer
Attributes:
colony_size (int, optional): bee population size; defaults to value set in \
:class:`~config.params.Params`
employees (list): list of all :class:`~core.abc.employee_bee.EmployeeBee` used in the \
optimization
eo_colony_ratio (float, optional): employees to onlookers ratio; defaults to value set in \
:class:`~config.params.Params`
obj_interface (:class:`~core.objective_interface.ObjectiveInterface`): the objective \
interface defining the task boundaries
onlookers (list): list of all :class:`~core.abc.onlooker_bee.OnlookerBee` used in the \
optimization
results_df (:class:`pandas.DataFrame`): the main results DataFrame containing all evaluated \
results
scouts (list): list of all :class:`~core.abc.scout_bee.ScoutBee`-sampled positions used in \
the optimization. Scout Bees are *not* instantiated.
scouts_count (int): number of scouts / parallel explorations. Follows the classical ABC \
1-to-1 scout-to-employee ratio
total_evals (int): total number of :func:`~core.abc.artificial_bee.ArtificialBee.evaluate` \
calls (for both Employees and Onlookers)
'''
def __init__(self, obj_interface, \
colony_size=Params['COLONY_SIZE'], \
employee_onlooker_ratio=Params['EMPLOYEE_ONLOOKER_RATIO']):
''' Initializes the ABC algorithm '''
self.obj_interface = obj_interface # Encapsulates the Search Space +
# Evaluation Strategy
self.colony_size = colony_size
self.eo_colony_ratio = employee_onlooker_ratio
self.scouts_count = int(self.colony_size * self.eo_colony_ratio)
def __init_scouts(self):
'''Initial :class:`~core.abc.food_source.FoodSource` sampling by Scout Bees \
*(does not evaluate fitness)*
'''
for _ in range(self.scouts_count):
self.scouts.append(ScoutBee.sample(self.obj_interface))
def __init_employees(self):
''' Instantiate Employee Bees and assign a Scout position to each '''
# Floor of (colony_size * ratio)
employee_count = int(self.colony_size * self.eo_colony_ratio)
for itr in range(employee_count):
# Split scouts evenly among employees
scout = self.scouts[int(itr / (employee_count / \
self.scouts_count))]
self.employees.append(EmployeeBee(scout))
def __init_onlookers(self):
'''Instantiate Onlooker Bees *(assigning Employees occurs after \
evaluation and probability calculation)*
'''
onlooker_count = self.colony_size - int(self.colony_size * self.eo_colony_ratio)
for itr in range(onlooker_count):
self.onlookers.append(OnlookerBee())
def __employee_bee_phase(self, itr):
'''Evaluate Scout-initialized position after :func:`~core.abc.scout_bee.ScoutBee.reset` \
or Search + Evaluate neighbor every subsequent iteration \
until abandonment limit
Args:
itr (int): current main ABC optimization iteration
'''
# Search and evaluate new or existing neighbor
for employee in self.employees:
# Ignored if unevaluated position exists (i.e Scout-sampled)
employee.search(self.obj_interface)
fs = employee.food_source
if fs is None or fs.encode_position() not in self.results_df['candidate']:
# Evaluate employee position
series = employee.evaluate(self.obj_interface, itr)
self.__save_results(series)
else:
# Already evaluated
fs.fitness = self.results_df[self.results_df['candidate'] == fs.encode_position()]['fitness'].values[0]
employee.greedy_select(fs, self.obj_interface.is_minimize)
# resampling the same candidate should count as a trial towards the abandonment limit?
# onlooker.employee.trials += 1 # to avoid being stuck
def __onlooker_bee_phase(self, itr):
'''Assign each Onlooker to an Employee, then Search + Evaluate a random neighbor
Args:
itr (int): current main ABC optimization iteration
'''
# Calculate Employee probability
sum_fitness = sum([employee.calculate_fitness() \
for employee in self.employees])
# Sum(probabilities) ≈ 1.0
probabilities = list(map(lambda employee: \
employee.compute_probability(sum_fitness), \
self.employees))
if not self.obj_interface.is_minimize:
# inverse weights to maximize the objective
# (assign more onlookers to higher fitness scores)
# reciprocals
probabilities = [1.0 / prob for prob in probabilities]
# normalize
sum_probs = sum(probabilities)
probabilities = [prob / sum_probs for prob in probabilities]
# Assign EmployeeBees to OnlookerBees, search, and evaluate neighbor
for onlooker in self.onlookers:
# Assign EmployeeBees to OnlookerBees
emp_idx = np.random.choice(len(self.employees), p=probabilities)
onlooker.assign_employee(self.employees[emp_idx])
# Search for a new random neighbor
onlooker.search(self.obj_interface)
fs = onlooker.food_source
if fs is None or fs.encode_position() not in self.results_df['candidate']:
# Evaluate employee position
series = onlooker.evaluate(self.obj_interface, itr)
self.__save_results(series)
else:
# Already evaluated
fs.fitness = self.results_df[self.results_df['candidate'] == fs.encode_position()]['fitness'].values[0]
onlooker.employee.greedy_select(fs, self.obj_interface.is_minimize)
# resampling the same candidate should count as a trial towards the abandonment limit?
# onlooker.employee.trials += 1 # to avoid being stuck
def __scout_bee_phase(self):
'''Check abandonment limits and rest employees accordingly
'''
ScoutBee.check_abandonment(self.employees, self.obj_interface)
def __save_results(self, series):
'''Save results dataframe to specified disk path
Args:
series (:class:`pandas.Series): Pandas Series (row) to be appended to the \
current :code:`results_df`
'''
if self.total_evals % Params['RESULTS_SAVE_FREQUENCY'] == 0:
self.results_df = pd.concat([self.results_df,
pd.DataFrame([series],
columns=series.index)]
).reset_index(drop=True)
#self.results_df = self.results_df.append(series, ignore_index=True)
filename = f'{Params["CONFIG_VERSION"]}.csv'
if FileHandler.save_df(self.results_df,
Params.get_results_path(),
filename):
Logger.filesave_log(series['candidate'], series['weights_filename'])
def __momentum_phase(self):
'''Momentum Evaluation Augmentation Stochastic operator that adds \
:code:`Params['MOMENTUM_EPOCHS']` epochs to propel the evaluations of \
the most consistently converging candidates
'''
# calculate probabilities
calculated_momentums = self.results_df['momentum'] / (self.results_df['epochs'] + \
self.results_df['momentum_epochs'])
if sum(calculated_momentums) == 0:
# improbable edge case where the sum of momentums is exactly 0
return
probs = calculated_momentums / sum(calculated_momentums)
the_chosen_ones = dict.fromkeys([x for x in range(len(probs))], 0)
for _ in range(Params['MOMENTUM_EPOCHS']):
# probabilistically assign momentum epochs
idx = np.random.choice(len(probs), p=probs)
the_chosen_ones[idx] += 1
# training extension loop
for the_one, m_epochs in the_chosen_ones.items():
candidate_row = self.results_df.iloc[[the_one]]
# extract candidate info for additional training
candidate = candidate_row['candidate'].values[0]
weights_file = candidate_row['weights_filename'].values[0]
momentum = candidate_row['momentum'].values[0]
epochs = candidate_row['epochs'].values[0]
momentum_epochs = candidate_row['momentum_epochs'].values[0] + m_epochs
# train for m_epochs
Logger.momentum_evaluation_log(candidate,
candidate_row['fitness'].values[0],
m_epochs)
res = self.obj_interface.momentum_eval(candidate,
weights_file,
m_epochs)
# save new results
self.results_df.loc[the_one, 'fitness'] = res['fitness']
self.results_df.loc[the_one, 'momentum_epochs'] = momentum_epochs
def __reset_all(self):
''' Resets the ABC algorithm
'''
self.scouts = [] # List of FoodSources initially sampled
self.employees = []
self.onlookers = []
EmployeeBee.id_tracker = 0
OnlookerBee.id_tracker = 0
# init results dataframe
filename = f'_{Params["CONFIG_VERSION"]}.csv'
results_file = os.path.join(Params.get_results_path(), filename)
# resume from previously saved file if it exists
if Params['RESUME_FROM_RESULTS_FILE']:
self.results_df = FileHandler.load_df(results_file) # loads empty df if file not found
else:
cols = ['bee_type'
'bee_id',
'bee_parent',
'itr',
'candidate',
'fitness',
'center_fitness',
'epochs',
'params',
'weights_filename',
'time']
self.results_df = pd.DataFrame(columns=cols)
self.total_evals = len(self.results_df.index)
[docs] def optimize(self):
''' Main optimization loop
'''
Params.export_yaml(Params.get_results_path(),
f'{Params["CONFIG_VERSION"]}.yaml')
Logger.start_log()
self.__reset_all()
self.__init_scouts()
self.__init_employees()
self.__init_onlookers()
start_time = time.time()
fitness_selector = max if not self.obj_interface.is_minimize else min
''' Optimization loop '''
for itr in range(Params['ITERATIONS_COUNT']):
self.__employee_bee_phase(itr)
self.__onlooker_bee_phase(itr)
self.__momentum_phase()
self.__scout_bee_phase()
best_fitness = fitness_selector(self.results_df['fitness'].tolist())
if itr % Params['RESULTS_SAVE_FREQUENCY'] == 0:
Logger.status(itr,
'Best fitness: {}, Total time (s): {}'.format(best_fitness,
time.time() - start_time))
Logger.end_log()