ATTENTION: we are currently upgrading gitlabs storage backend for repositories. This may cause unavailability and in unlikely cases also data loss. Therefore, we recommend to keep local copies of all repos. Issues, Milestones, and other features of gitlab are not affected.

Commit a6fa975c authored by Daniel Seybold's avatar Daniel Seybold

added automated plotting for availability phases

parent e5fa3db9
Pipeline #64982 passed with stage
in 14 minutes and 12 seconds
#! python3
import os
import sys
import numpy as np
import json
import pprint
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import argparse
import pandas as pd
import seaborn as sns
import json
import itertools
import plotting_config
# STYLE SETTINGS
colors = [
'#1f77b4',
'#2ca02c',
'#d62728',
'#ff7f0e',
'#9467bd',
'#8c564b',
'#e377c2',
'#7f7f7f',
'#bcbd22',
'#17becf'
]
palette = itertools.cycle(sns.color_palette(colors))
palette_end = itertools.cycle(sns.color_palette(colors))
plt.rcParams["font.family"] = "Times New Roman"
sns.set(style="darkgrid", font='serif', font_scale=0.8)
class TimeseriesData:
timestamps = []
operations = []
latencies = []
scaleOutAverage = None
def plot_timeseries():
parser = argparse.ArgumentParser(description='Plot timeseries and averages of different phases based on YCSB result files.')
parser.add_argument('--runFolder', '-rf',type=str, nargs=1,required=True, help='path to the evalation_run_XXX folder')
parser.add_argument('--workloadPhase', '-wp',type=str, nargs=1,required=True, choices=['load', 'transaction'], help='type of the workload phase: load/transaction')
args = parser.parse_args()
inputPath = args.runFolder[0]
workloadPhase = args.workloadPhase[0]
print("creating availability phase plots...")
df = getPhasesDataframe(inputPath)
print(df)
plot_availability_timeseries(inputPath,df,workloadPhase)
print("finished evaluation metrics plotting!")
def plot_availability_timeseries(inputPath, df, workloadPhase):
if workloadPhase == "load":
dataPath = os.path.join(inputPath, plotting_config.DATA_FOLDER, plotting_config.DATA_LOAD_FILE)
elif workloadPhase == "transaction":
dataPath = os.path.join(inputPath, plotting_config.DATA_FOLDER, plotting_config.DATA_TRANSACTION_FILE)
else:
print("Exiting due to unsupported workloadPhase: " + workloadPhase)
sys.exit(1)
extractedTimeseriesData = TimeseriesData()
file = open(dataPath, 'r', encoding='utf8')
try:
for line in file:
line = line.rstrip()
if "[TIMESERIES]" in line:
#extract current ops per second
splittedLine = line.split(";")
operationItem = splittedLine[2]
operationItem = operationItem.strip()
#print (operationItem)
extractedTimeseriesData.operations.append(float(operationItem))
#extract timestamp
secondsItem = splittedLine[1]
secondsItem = secondsItem.strip()
extractedTimeseriesData.timestamps.append(float(secondsItem))
except UnicodeDecodeError:
print("Skipping line! Unable to decode line due to UnicodeDecodeError!")
# create a dataframe for seaborn plotting
plottingDf = pd.DataFrame({'timestamps': extractedTimeseriesData.timestamps,'operations' : extractedTimeseriesData.operations})
plottingDf = plottingDf[['timestamps','operations']]
#create a dataframe with statistis about phases
phasesStatsColNames =['initialAvg','initialMin','initialMinOccurance','initialMax','initialMaxOccurance','unhealthyAvg','unhealthyPhaseMin','unhealthyPhaseMinOccurance','unhealthyPhaseMax','unhealthyPhaseMaxOccurance','recoveryAvg','recoveryPhaseMin','recoveryPhaseMinOccurance','recoveryPhaseMax','recoveryPhaseMaxOccurance',
'recoveredAvg','recoveredPhaseMin','recoveredPhaseMinOccurance','recoveredPhaseMax','recoveredPhaseMaxOccurance']
phasesStatsDf = pd.DataFrame(columns = phasesStatsColNames)
initialAvg, initialPhaseMin, initialPhaseMinOccurance, initialPhaseMax, initialPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,0, df.loc[0,'InjectFailureEnd'])
#print("Initial (healthy) phase average is: " + str(initialAvg) + " ops/s")
#print("Initial (healthy) phase min is: " + str(initialPhaseMin) + " ops/s")
#print("Initial (healthy) phase min occurance is: " + str(initialPhaseMinOccurance))
phasesStatsDf.loc[0 , 'initialAvg'] = initialAvg
phasesStatsDf.loc[0 ,'initialMin'] = initialPhaseMin
phasesStatsDf.loc[0 ,'initialMinOccurance'] = initialPhaseMinOccurance
phasesStatsDf.loc[0 ,'initialMax'] = initialPhaseMax
phasesStatsDf.loc[0 ,'initialMaxOccurance'] = initialPhaseMaxOccurance
evaluationEnd = max(extractedTimeseriesData.timestamps)
print("plotting throughput timeseries...")
# Plot linecharts
ax = sns.lineplot(x="timestamps",
y="operations",
palette=palette,
data=plottingDf)
for index, row in df.iterrows():
# failure injection
plt.axvline(df.loc[index,'InjectFailureStart'],
color='orangered',
linestyle=':',
label='failure injection')
#highlight healty area
plt.axvspan(0, df.loc[index,'InjectFailureStart'], facecolor='limegreen', alpha=0.35, label='healthy')
# failure with no recovery following
if pd.isna(df.loc[index,'RecoveryVMStart']) and (index + 1) == len(df.index):
unhealthyAvg, unhealthyPhaseMin, unhealthyPhaseMinOccurance, unhealthyPhaseMax, unhealthyPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,df.loc[index,'InjectFailureEnd'],evaluationEnd)
phasesStatsDf.loc[index ,'unhealthyAvg'] = unhealthyAvg
phasesStatsDf.loc[index ,'unhealthyPhaseMin'] = unhealthyPhaseMin
phasesStatsDf.loc[index ,'unhealthyPhaseMinOccurance'] = unhealthyPhaseMinOccurance
phasesStatsDf.loc[index ,'unhealthyPhaseMax'] = unhealthyPhaseMax
phasesStatsDf.loc[index ,'unhealthyPhaseMaxOccurance'] = unhealthyPhaseMaxOccurance
#highlight unhealty area
plt.axvspan(df.loc[index,'InjectFailureStart'], df.loc[index,'RecoveryVMStart'], facecolor='lightcoral', alpha=0.35, label='unhealthy')
#failure with additional recovery
else:
unhealthyAvg, unhealthyPhaseMin, unhealthyPhaseMinOccurance, unhealthyPhaseMax, unhealthyPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,df.loc[index,'InjectFailureEnd'],df.loc[index,'RecoveryVMStart'])
phasesStatsDf.loc[index ,'unhealthyAvg'] = unhealthyAvg
phasesStatsDf.loc[index ,'unhealthyPhaseMin'] = unhealthyPhaseMin
phasesStatsDf.loc[index ,'unhealthyPhaseMinOccurance'] = unhealthyPhaseMinOccurance
phasesStatsDf.loc[index ,'unhealthyPhaseMax'] = unhealthyPhaseMax
phasesStatsDf.loc[index ,'unhealthyPhaseMaxOccurance'] = unhealthyPhaseMaxOccurance
#highlight unhealty area
plt.axvspan(df.loc[index,'InjectFailureStart'], df.loc[index,'RecoveryVMStart'], facecolor='lightcoral', alpha=0.35, label='unhealthy')
plt.axvline(df.loc[index,'RecoveryVMStart'],
color='darkorange',
linestyle=':',
label='recovery start')
#recovery avg
recoveryAvg, recoveryPhaseMin, recoveryPhaseMinOccurance, recoveryPhaseMax, recoveryPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,df.loc[index,'RecoveryVMStart'],df.loc[index,'RecoveryDBMSEnd'])
phasesStatsDf.loc[index ,'recoveryAvg'] = recoveryAvg
phasesStatsDf.loc[index ,'recoveryPhaseMin'] = recoveryPhaseMin
phasesStatsDf.loc[index ,'recoveryPhaseMinOccurance'] = recoveryPhaseMinOccurance
phasesStatsDf.loc[index ,'recoveryPhaseMax'] = recoveryPhaseMax
phasesStatsDf.loc[index ,'recoveryPhaseMaxOccurance'] = recoveryPhaseMaxOccurance
#highlight recovery area
plt.axvspan(df.loc[index,'RecoveryVMStart'], df.loc[index,'RecoveryDBMSEnd'], facecolor='orange', alpha=0.35, label='recovering')
plt.axvline(df.loc[index,'RecoveryDBMSEnd'],
color='limegreen',
linestyle=':',
label='recovery finished')
#recovered avg
if (index + 1) == len(df.index):
recoveredAvg, recoveredPhaseMin, recoveredPhaseMinOccurance, recoveredPhaseMax, recoveredPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,df.loc[index,'RecoveryDBMSEnd'],evaluationEnd)
phasesStatsDf.loc[index ,'recoveredAvg'] = recoveredAvg
phasesStatsDf.loc[index ,'recoveredPhaseMin'] = recoveredPhaseMin
phasesStatsDf.loc[index ,'recoveredPhaseMinOccurance'] = recoveredPhaseMinOccurance
phasesStatsDf.loc[index ,'recoveredPhaseMax'] = recoveredPhaseMax
phasesStatsDf.loc[index ,'recoveredPhaseMaxOccurance'] = recoveredPhaseMaxOccurance
#highlight recovered area
if(evaluationEnd > df.loc[index,'RecoveryDBMSEnd']):
plt.axvspan(df.loc[index,'RecoveryDBMSEnd'], evaluationEnd, facecolor='lightgreen', alpha=0.35, label='recovered')
else:
recoveredAvg, recoveredPhaseMin, recoveredPhaseMinOccurance, recoveredPhaseMax, recoveredPhaseMaxOccurance = calculate_phase_average(extractedTimeseriesData,df.loc[index,'RecoveryDBMSEnd'],df.loc[(index + 1),'InjectFailureEnd'])
phasesStatsDf.loc[index ,'recoveredAvg'] = recoveredAvg
phasesStatsDf.loc[index ,'recoveredPhaseMin'] = recoveredPhaseMin
phasesStatsDf.loc[index ,'recoveredPhaseMinOccurance'] = recoveredPhaseMinOccurance
phasesStatsDf.loc[index ,'recoveredPhaseMax'] = recoveredPhaseMax
phasesStatsDf.loc[index ,'recoveredPhaseMaxOccurance'] = recoveredPhaseMaxOccurance
#highlight recovered area
if(evaluationEnd > df.loc[index,'RecoveryDBMSEnd']):
plt.axvspan(df.loc[index,'RecoveryDBMSEnd'], df.loc[(index + 1),'InjectFailureEnd'], facecolor='lightgreen', alpha=0.35, label='recovered')
yMax = max(extractedTimeseriesData.operations) + 1000
ax.set_ylim([-100,yMax])
ax.set_ylabel('average throughput in ops/s')
ax.set_xlabel('runtime in s')
#no title as title will be set via latex
#ax.set_title(inputPath.split('/')[-2])
legend = ax.legend()
outputFile = inputPath + plotting_config.TIMESERiES_FOLDER + "availability-throughput.pdf"
plt.savefig(outputFile, format='pdf')
# save phase dataframe in different formats
print("exporting phases and stats for further processing...")
outputDataframePath = inputPath + plotting_config.TIMESERiES_FOLDER + "phases-dataframe"
df.to_pickle(outputDataframePath)
outputPhasesExcelPath = inputPath + plotting_config.TIMESERiES_FOLDER + "phases.xlsx"
df.to_excel(outputPhasesExcelPath)
# save phase stats as dataframe and excel
outputStatsDataframePath = inputPath + plotting_config.TIMESERiES_FOLDER + "phases-stats-dataframe"
df.to_pickle(outputStatsDataframePath)
outputPhaseStatsPath = inputPath + plotting_config.TIMESERiES_FOLDER + "phases-stats.xlsx"
phasesStatsDf.to_excel(outputPhaseStatsPath)
def calculate_phase_average(extractedTimeseriesData, start,end):
# defines the boundaries to search for the occurance of min/max values
OCCURANCE_SEARCH_BOUNDARIES=10
df = pd.DataFrame({'timestamps': extractedTimeseriesData.timestamps,'operations' : extractedTimeseriesData.operations})
df = df[['timestamps','operations']]
#print(df.to_string())
#maximum = df['operations'].max()
#print('Max value is: ' , maximum)
#print("phase start: " + str(start) + " phase end: " + str(end))
if(start > end):
print("Start of current phase is after end of phase! Did the adapation finish after the transaction phase?")
phaseDataFrame = df[df['timestamps'].between(start, end, inclusive=True)]
#print(phaseDataFrame.to_string())
average = phaseDataFrame['operations'].mean()
#print('Phase average is: ' , average)
phaseMin = phaseDataFrame['operations'].min()
phaseMinUpperBound = phaseMin + OCCURANCE_SEARCH_BOUNDARIES
phaseMinLowerBound = phaseMin - OCCURANCE_SEARCH_BOUNDARIES
phaseMinOccurance = phaseDataFrame.operations.between(phaseMinLowerBound,phaseMinUpperBound).sum()
phaseMax = phaseDataFrame['operations'].max()
phaseMaxUpperBound = phaseMax + OCCURANCE_SEARCH_BOUNDARIES
phaseMaxLowerBound = phaseMax - OCCURANCE_SEARCH_BOUNDARIES
phaseMaxOccurance = phaseDataFrame.operations.between(phaseMaxLowerBound,phaseMaxUpperBound).sum()
#print('Phase minimum is: ' , phaseMin)
return average, phaseMin, phaseMinOccurance, phaseMax, phaseMaxOccurance
def getPhaseDurations(inputPath):
#TODO: currently only supports one adaptation phase by using the second occurrence of the respective tasks!
inputPath = inputPath + plotting_config.TASK_FOLDER + plotting_config.TASK_LOG_FILE
file = open(inputPath, 'r', encoding='utf8')
deployVmStart = None
deployVmFinish = None
deployScaledDbStart = None
deployScaledDbFinish = None
try:
for line in file:
line = line.rstrip()
if "SPAWN_VM" in line and "STARTED" in line:
splittedLine = line.split(";")
deployVmStart = splittedLine[1]
deployVmStart = deployVmStart.replace('s','')
deployVmStart = int(deployVmStart)
if "SPAWN_VM" in line and "FINISHED" in line:
splittedLine = line.split(";")
deployVmFinish = splittedLine[1]
deployVmFinish = deployVmFinish.replace('s','')
deployVmFinish = int(deployVmFinish)
if "DEPLOY_DB" in line and "STARTED" in line:
splittedLine = line.split(";")
deployScaledDbStart = splittedLine[1]
deployScaledDbStart = deployScaledDbStart.replace('s','')
deployScaledDbStart = int(deployScaledDbStart)
if "DEPLOY_DB" in line and "FINISHED" in line:
splittedLine = line.split(";")
deployScaledDbFinish = splittedLine[1]
deployScaledDbFinish = deployScaledDbFinish.replace('s','')
deployScaledDbFinish = int(deployScaledDbFinish)
except UnicodeDecodeError:
print("Skipping line! Unable to decode line due to UnicodeDecodeError!")
deployVmDuration = deployVmFinish - deployVmStart
scaleOutDuration = deployScaledDbFinish - deployScaledDbStart
print("VM deploy duration is: " + str(deployVmDuration))
print("scale-out duration is: " + str(scaleOutDuration))
return deployVmDuration, scaleOutDuration
def getPhasesDataframe(inputPath):
inputPath = inputPath + plotting_config.TASK_FOLDER + plotting_config.TASK_LOG_FILE
file = open(inputPath, 'r', encoding='utf8')
colNames =['YCSBLoadStart','YCSBTransactionStart','GibbonStart','GibbonEnd', 'InjectFailureStart', 'InjectFailureEnd','RecoveryVMStart','RecoveryVMEnd','RecoveryDBMSStart','RecoveryDBMSEnd']
df = pd.DataFrame(columns = colNames)
indexCounter = 0
try:
for line in file:
line = line.rstrip()
if "YCSB_LOAD" in line and "STARTED" in line:
splittedLine = line.split(";")
ycsbLoadStart = splittedLine[1]
ycsbLoadStart = ycsbLoadStart.replace('s','')
ycsbLoadStart = int(ycsbLoadStart)
df.loc[indexCounter,'YCSBLoadStart'] = ycsbLoadStart
if "YCSB_TRANSACTION" in line and "STARTED" in line:
splittedLine = line.split(";")
ycsbTransactionStart = splittedLine[1]
ycsbTransactionStart = ycsbTransactionStart.replace('s','')
ycsbTransactionStart = int(ycsbTransactionStart)
df.loc[indexCounter,'YCSBTransactionStart'] = ycsbTransactionStart
if "GIBBON" in line and "STARTED" in line:
splittedLine = line.split(";")
GibbonStart = splittedLine[1]
GibbonStart = GibbonStart.replace('s','')
GibbonStart = int(GibbonStart)
df.loc[indexCounter,'GibbonStart'] = GibbonStart
if "INJECT_FAILURE" in line and "STARTED" in line:
splittedLine = line.split(";")
InjectStart = splittedLine[1]
InjectStart = InjectStart.replace('s','')
InjectStart = int(InjectStart)
df.loc[indexCounter,'InjectFailureStart'] = InjectStart
if "INJECT_FAILURE" in line and "FINISHED" in line:
splittedLine = line.split(";")
InjectEnd = splittedLine[1]
InjectEnd = InjectEnd.replace('s','')
InjectEnd = int(InjectEnd)
df.loc[indexCounter,'InjectFailureEnd'] = InjectEnd
if "RECOVERY_SPAWN_VM" in line and "STARTED" in line:
splittedLine = line.split(";")
revoveryVMStart = splittedLine[1]
revoveryVMStart = revoveryVMStart.replace('s','')
revoveryVMStart = int(revoveryVMStart)
df.loc[indexCounter,'RecoveryVMStart'] = revoveryVMStart
if "RECOVERY_SPAWN_VM" in line and "FINISHED" in line:
splittedLine = line.split(";")
revoveryVMEnd = splittedLine[1]
revoveryVMEnd = revoveryVMEnd.replace('s','')
revoveryVMEnd = int(revoveryVMEnd)
df.loc[indexCounter,'RecoveryVMEnd'] = revoveryVMEnd
if "RECOVERY_DEPLOY_DB" in line and "STARTED" in line:
splittedLine = line.split(";")
revoveryDBMSStart = splittedLine[1]
revoveryDBMSStart = revoveryDBMSStart.replace('s','')
revoveryDBMSStart = int(revoveryDBMSStart)
df.loc[indexCounter,'RecoveryDBMSStart'] = revoveryDBMSStart
if "RECOVERY_DEPLOY_DB" in line and "FINISHED" in line:
splittedLine = line.split(";")
revoveryDBMSEnd = splittedLine[1]
revoveryDBMSEnd = revoveryDBMSEnd.replace('s','')
revoveryDBMSEnd = int(revoveryDBMSEnd)
df.loc[indexCounter,'RecoveryDBMSEnd'] = revoveryDBMSEnd
if "GIBBON" in line and "FINISHED" in line:
splittedLine = line.split(";")
GibbonEnd = splittedLine[1]
GibbonEnd = GibbonEnd.replace('s','')
GibbonEnd = int(GibbonEnd)
df.loc[indexCounter,'GibbonEnd'] = GibbonEnd
indexCounter = indexCounter + 1
except UnicodeDecodeError:
print("Skipping line! Unable to decode line due to UnicodeDecodeError!")
print("non-normalized data frame:")
print(df)
# normalize dataframe for load phase
if pd.isna(df.loc[0,'YCSBTransactionStart']):
print("normalizing for load phase")
df.GibbonStart = df.GibbonStart - df.loc[0,'YCSBLoadStart']
df.GibbonEnd = df.GibbonEnd - df.loc[0,'YCSBLoadStart']
df.InjectFailureStart = df.InjectFailureStart - df.loc[0,'YCSBLoadStart']
df.InjectFailureEnd = df.InjectFailureEnd - df.loc[0,'YCSBLoadStart']
df.RecoveryVMStart = df.RecoveryVMStart - df.loc[0,'YCSBLoadStart']
df.RecoveryVMEnd = df.RecoveryVMEnd - df.loc[0,'YCSBLoadStart']
df.RecoveryDBMSStart = df.RecoveryDBMSStart - df.loc[0,'YCSBLoadStart']
df.RecoveryDBMSEnd = df.RecoveryDBMSEnd - df.loc[0,'YCSBLoadStart']
# normalize dataframe for transaction phase
else:
print("normalizing for transaction phase")
df.GibbonStart = df.GibbonStart - df.loc[0,'YCSBTransactionStart']
df.GibbonEnd = df.GibbonEnd - df.loc[0,'YCSBTransactionStart']
df.InjectFailureStart = df.InjectFailureStart - df.loc[0,'YCSBTransactionStart']
df.InjectFailureEnd = df.InjectFailureEnd - df.loc[0,'YCSBTransactionStart']
df.RecoveryVMStart = df.RecoveryVMStart - df.loc[0,'YCSBTransactionStart']
df.RecoveryVMEnd = df.RecoveryVMEnd - df.loc[0,'YCSBTransactionStart']
df.RecoveryDBMSStart = df.RecoveryDBMSStart - df.loc[0,'YCSBTransactionStart']
df.RecoveryDBMSEnd = df.RecoveryDBMSEnd - df.loc[0,'YCSBTransactionStart']
return df
#init function call
plot_timeseries()
\ No newline at end of file
......@@ -9,11 +9,10 @@ matplotlib.use('Agg')
import matplotlib.pyplot as plt
import argparse
import pandas as pd
import seaborn as sns
import plotting_config
class TimeseriesData:
timestamps = []
operations = []
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment