Commit ef0370f9 authored by Daniel Seybold's avatar Daniel Seybold

moved tsbs execution in a separate thread to avoid connection reset erros during load phase

parent d4ea70e1
Pipeline #80754 passed with stage
in 17 minutes and 57 seconds
package de.uulm.omi.dbms.workload.api.impl;
import de.uulm.omi.cloudiator.dbms.evaluator.model.*;
import de.uulm.omi.cloudiator.dbms.evaluator.model.ProcessStatus;
import de.uulm.omi.cloudiator.dbms.evaluator.model.TPCCWorkload;
import de.uulm.omi.cloudiator.dbms.evaluator.model.TSBSWorkload;
import de.uulm.omi.cloudiator.dbms.evaluator.model.WorkloadProcessInstance;
import de.uulm.omi.cloudiator.dbms.evaluator.model.WorkloadProcessSubmissionError;
import de.uulm.omi.cloudiator.dbms.evaluator.model.YCSBWorkload;
import de.uulm.omi.dbms.workload.api.ApiResponseMessage;
import de.uulm.omi.dbms.workload.api.NotFoundException;
import de.uulm.omi.dbms.workload.api.WorkloadApiService;
......@@ -8,20 +13,18 @@ import de.uulm.omi.utils.TextFileLoader;
import de.uulm.omi.utils.WorkloadApiProperties;
import de.uulm.omi.workload.WorkloadProcess;
import de.uulm.omi.workload.tpcc.TPCCCommandBuilder;
import de.uulm.omi.workload.tsbs.TSBSCommandBuilder;
import de.uulm.omi.workload.tsbs.Runner;
import de.uulm.omi.workload.ycsb.YCSBCommandBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.SecurityContext;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Properties;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@javax.annotation.Generated(value = "io.swagger.codegen.languages.JavaJerseyServerCodegen", date = "2018-01-16T09:59:22.348Z")
public class WorkloadApiServiceImpl extends WorkloadApiService {
......@@ -44,13 +47,6 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
private static final String TPCC_BINARY = "./cockroach";
//TSBS constants
private final String TSBS_RESULT_FOLDER = "TSBS/";
private static final String TSBS_BINARY = "/opt/workloads/tsbs/bin/";
//checking interval for TSBS commands
private final int CHECKING_INTERVAL = 5000;
@Override
public Response workloadResultGet(@NotNull String taskId, @NotNull String workloadType,
......@@ -210,9 +206,6 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
public Response workloadTsbsPost(String taskId, TSBSWorkload workload,
SecurityContext securityContext) throws NotFoundException {
//TODO: check path to bin or script folders, do we need to set explicitly the go path ?
//Phase 0: check if already a workloadProcess running
WorkloadProcess workloadProcess = WorkloadProcess.getInstance();
if (workloadProcess.isRunning()) {
......@@ -223,156 +216,16 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
return Response.status(Status.FORBIDDEN).entity(workloadProcessSubmissionError).build();
}
Properties properties = WorkloadApiProperties.getInstance().getProperties();
Long workloadTimestamp = System.currentTimeMillis();
LOGGER.debug(workload.toString());
//Phase 1: generate data
LOGGER.debug("Starting phase 1: data generations");
String generateDataCommand= "";
generateDataCommand = TSBSCommandBuilder.generateDataCommand( TSBS_BINARY, workload.getDbms(), workload.getTsbsDataGeneration());
generateDataCommand += properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER) + workload.getDbms().toString().toLowerCase() + "-data.gz";
LOGGER.debug("Generate data command: " + generateDataCommand);
ProcessBuilder genDataProcess= new ProcessBuilder();
genDataProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, generateDataCommand);
Process processData = null;
try {
processData = genDataProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processData.isAlive()) {
try {
LOGGER.debug("Data generation in process...");
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 1: data generation finished with exit code: " + processData.exitValue());
if(processData.exitValue() != 0){
return Response.serverError().entity("Generate data phase failed!").build();
}
//Phase 2: query generation
String generateQueriesCommand = "";
generateQueriesCommand = TSBSCommandBuilder.generateQueriesCommand(TSBS_BINARY, workload.getDbms(), workload.getTsbsDataGeneration(), workload.getTsbsQueryGeneration());
generateQueriesCommand += properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER) + workload.getDbms().toString().toLowerCase() + "-queries.gz" ;
LOGGER.debug(generateQueriesCommand);
ProcessBuilder genQueryProcess = new ProcessBuilder();
genQueryProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, generateQueriesCommand);
Process processGenQueries = null;
try {
processGenQueries = genQueryProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processGenQueries.isAlive()) {
try {
LOGGER.debug("Query generation in process...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 2: query generation finished with exit code: " + processGenQueries.exitValue());
if(processGenQueries.exitValue() != 0){
return Response.serverError().entity("Generate queries phase failed!").build();
}
//Phase 3: executing insert/load phase
//clear database
String clearDBCommand = TSBSCommandBuilder.clearDB(workload.getDbms(), workload.getTsbsLoadExecution().getDbName(),workload.getDbEndpoints());
ProcessBuilder processClearDBBuilder = new ProcessBuilder();
processClearDBBuilder.command(UNIX_SHELL, UNIX_COMMAND_PARAM, clearDBCommand);
LOGGER.debug(clearDBCommand);
Process clearDBProcess = null;
try {
clearDBProcess = processClearDBBuilder.start();
} catch (IOException e) {
e.printStackTrace();
}
while (clearDBProcess.isAlive()) {
try {
LOGGER.debug("Clear DB in process...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Clear DB finished with exit code: " + clearDBProcess.exitValue());
String loadCommand = "";
loadCommand = TSBSCommandBuilder.generateLoadCommand(TSBS_BINARY, properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER), workload.getDbms(), workload.getTsbsLoadExecution(),workload.getDbEndpoints());
loadCommand += " > " + properties.getProperty(WorkloadApiProperties.WORKLOAD_RESULTS_FOLDER) + TSBS_RESULT_FOLDER + taskId.trim() + "_load.txt";
LOGGER.debug(loadCommand);
ProcessBuilder loadProcess = new ProcessBuilder();
loadProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, loadCommand);
Process processLoad = null;
try {
processLoad = loadProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processLoad.isAlive()) {
try {
LOGGER.debug("Load phase in process...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 3: load phase finished with exit code: " + processLoad.exitValue());
if(processLoad.exitValue() != 0){
return Response.serverError().entity("Generate queries phase failed!").build();
}
//Phase 4: executing query phase
//Execute Queries
String executeQueriesCommand = TSBSCommandBuilder.executeQueriesCommand(TSBS_BINARY, properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER), workload.getDbms(), workload.getTsbsQueryExecution(), workload.getDbEndpoints() );
executeQueriesCommand += " > " + properties.getProperty(WorkloadApiProperties.WORKLOAD_RESULTS_FOLDER) + TSBS_RESULT_FOLDER + taskId.trim() + ".txt";
LOGGER.debug(loadCommand);
workloadProcess
.startWorkload(TSBS_BINARY, UNIX_SHELL, UNIX_COMMAND_PARAM, executeQueriesCommand, taskId,
workloadTimestamp);
WorkloadProcessInstance workloadProcessInstance = new WorkloadProcessInstance();
workloadProcessInstance.setProcessStatus(ProcessStatus.RUNNING);
workloadProcessInstance.setTimestamp(BigDecimal.valueOf(workloadProcess.getStartTimestamp()));
workloadProcessInstance.setProcessId(workloadProcess.getProcessId());
if (workloadProcess.isRunning()) {
LOGGER.debug("WorkloadPrcess started!");
workloadProcessInstance.setProcessStatus(ProcessStatus.RUNNING);
return Response.ok().entity(workloadProcessInstance).build();
} else {
if (workloadProcess.getExitCode() != 0) {
LOGGER.debug("workloadPrcess exited with " + workloadProcess.getExitCode()
+ " Incorrect command provided? Command: " + "TODO");
workloadProcessInstance.setProcessStatus(ProcessStatus.ERROR);
return Response.ok().entity(workloadProcessInstance).build();
} else {
LOGGER.debug(
"workloadPrcess already finished, incorrect command provided? Command: " + "TODO");
workloadProcessInstance.setProcessStatus(ProcessStatus.IDLE);
return Response.ok().entity(workloadProcessInstance).build();
}
}
Runner tsbsRunner = new Runner(workloadProcessInstance, workload, taskId);
new Thread(tsbsRunner).start();
return Response.ok().entity(workloadProcessInstance).build();
}
......
package de.uulm.omi.workload.tsbs;
import de.uulm.omi.cloudiator.dbms.evaluator.model.ProcessStatus;
import de.uulm.omi.cloudiator.dbms.evaluator.model.TSBSWorkload;
import de.uulm.omi.cloudiator.dbms.evaluator.model.WorkloadProcessInstance;
import de.uulm.omi.utils.WorkloadApiProperties;
import de.uulm.omi.workload.WorkloadProcess;
import java.io.IOException;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Created by Daniel Seybold on 11.05.2020.
*/
public class Runner implements Runnable {
private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
private final String UNIX_SHELL = "bash";
private final String UNIX_COMMAND_PARAM = "-c";
//TSBS constants
private final String TSBS_RESULT_FOLDER = "TSBS/";
private static final String TSBS_BINARY = "/opt/workloads/tsbs/bin/";
//checking interval for TSBS commands
private final int CHECKING_INTERVAL = 5000;
private TSBSWorkload workload;
private WorkloadProcessInstance workloadProcessInstance;
private String taskId;
public Runner(WorkloadProcessInstance workloadProcessInstance, TSBSWorkload workload,
String taskId) {
this.workload = workload;
this.workloadProcessInstance = workloadProcessInstance;
this.taskId = taskId;
}
@Override
public void run() {
Properties properties = WorkloadApiProperties.getInstance().getProperties();
Long workloadTimestamp = System.currentTimeMillis();
LOGGER.debug(workload.toString());
//Phase 1: generate data
LOGGER.debug("Starting phase 1: data generations");
String generateDataCommand= "";
generateDataCommand = TSBSCommandBuilder.generateDataCommand( TSBS_BINARY, workload.getDbms(), workload.getTsbsDataGeneration());
generateDataCommand += properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER) + workload.getDbms().toString().toLowerCase() + "-data.gz";
LOGGER.debug("Generate data command: " + generateDataCommand);
ProcessBuilder genDataProcess= new ProcessBuilder();
genDataProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, generateDataCommand);
Process processData = null;
try {
processData = genDataProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processData.isAlive()) {
try {
LOGGER.debug("Data generation in process...");
Thread.sleep(CHECKING_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 1: data generation finished with exit code: " + processData.exitValue());
if(processData.exitValue() != 0){
workloadProcessInstance.setProcessStatus(ProcessStatus.ERROR);
Thread.currentThread().interrupt();
return;
}
//Phase 2: query generation
String generateQueriesCommand = "";
generateQueriesCommand = TSBSCommandBuilder.generateQueriesCommand(TSBS_BINARY, workload.getDbms(), workload.getTsbsDataGeneration(), workload.getTsbsQueryGeneration());
generateQueriesCommand += properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER) + workload.getDbms().toString().toLowerCase() + "-queries.gz" ;
LOGGER.debug(generateQueriesCommand);
ProcessBuilder genQueryProcess = new ProcessBuilder();
genQueryProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, generateQueriesCommand);
Process processGenQueries = null;
try {
processGenQueries = genQueryProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processGenQueries.isAlive()) {
try {
LOGGER.debug("Query generation in process...");
Thread.sleep(CHECKING_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 2: query generation finished with exit code: " + processGenQueries.exitValue());
if(processGenQueries.exitValue() != 0){
workloadProcessInstance.setProcessStatus(ProcessStatus.ERROR);
Thread.currentThread().interrupt();
return;
}
//Phase 3: executing insert/load phase
//clear database
String clearDBCommand = TSBSCommandBuilder.clearDB(workload.getDbms(), workload.getTsbsLoadExecution().getDbName(),workload.getDbEndpoints());
ProcessBuilder processClearDBBuilder = new ProcessBuilder();
processClearDBBuilder.command(UNIX_SHELL, UNIX_COMMAND_PARAM, clearDBCommand);
LOGGER.debug(clearDBCommand);
Process clearDBProcess = null;
try {
clearDBProcess = processClearDBBuilder.start();
} catch (IOException e) {
e.printStackTrace();
}
while (clearDBProcess.isAlive()) {
try {
LOGGER.debug("Clear DB in process...");
Thread.sleep(CHECKING_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Clear DB finished with exit code: " + clearDBProcess.exitValue());
String loadCommand = "";
loadCommand = TSBSCommandBuilder.generateLoadCommand(TSBS_BINARY, properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER), workload.getDbms(), workload.getTsbsLoadExecution(),workload.getDbEndpoints());
loadCommand += " > " + properties.getProperty(WorkloadApiProperties.WORKLOAD_RESULTS_FOLDER) + TSBS_RESULT_FOLDER + taskId.trim() + "_load.txt";
LOGGER.debug(loadCommand);
ProcessBuilder loadProcess = new ProcessBuilder();
loadProcess.command(UNIX_SHELL, UNIX_COMMAND_PARAM, loadCommand);
Process processLoad = null;
try {
processLoad = loadProcess.start();
} catch (IOException e) {
e.printStackTrace();
}
while (processLoad.isAlive()) {
try {
LOGGER.debug("Load phase in process...");
Thread.sleep(CHECKING_INTERVAL);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 3: load phase finished with exit code: " + processLoad.exitValue());
if(processLoad.exitValue() != 0){
workloadProcessInstance.setProcessStatus(ProcessStatus.ERROR);
Thread.currentThread().interrupt();
return;
}
//Phase 4: executing query phase
//Execute Queries
String executeQueriesCommand = TSBSCommandBuilder.executeQueriesCommand(TSBS_BINARY, properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER), workload.getDbms(), workload.getTsbsQueryExecution(), workload.getDbEndpoints() );
executeQueriesCommand += " > " + properties.getProperty(WorkloadApiProperties.WORKLOAD_RESULTS_FOLDER) + TSBS_RESULT_FOLDER + taskId.trim() + ".txt";
LOGGER.debug(loadCommand);
WorkloadProcess workloadProcess = WorkloadProcess.getInstance();
workloadProcess
.startWorkload(TSBS_BINARY, UNIX_SHELL, UNIX_COMMAND_PARAM, executeQueriesCommand, taskId,
workloadTimestamp);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment