Commit d4ea70e1 authored by Daniel Seybold's avatar Daniel Seybold

fixed parameter naming for Cassandra and TimescaleDB

parent 75d7d712
Pipeline #80747 passed with stage
in 21 minutes and 52 seconds
......@@ -48,6 +48,8 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
private final String TSBS_RESULT_FOLDER = "TSBS/";
private static final String TSBS_BINARY = "/opt/workloads/tsbs/bin/";
//checking interval for TSBS commands
private final int CHECKING_INTERVAL = 5000;
@Override
......@@ -245,13 +247,16 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
while (processData.isAlive()) {
try {
LOGGER.debug("Data generation in process...");
Thread.sleep(1000);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
LOGGER.debug("Phase 1: data generation finished with exit code: " + processData.exitValue());
if(processData.exitValue() != 0){
return Response.serverError().entity("Generate data phase failed!").build();
}
//Phase 2: query generation
......@@ -278,6 +283,9 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
}
}
LOGGER.debug("Phase 2: query generation finished with exit code: " + processGenQueries.exitValue());
if(processGenQueries.exitValue() != 0){
return Response.serverError().entity("Generate queries phase failed!").build();
}
//Phase 3: executing insert/load phase
......@@ -304,6 +312,7 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
LOGGER.debug("Clear DB finished with exit code: " + clearDBProcess.exitValue());
String loadCommand = "";
loadCommand = TSBSCommandBuilder.generateLoadCommand(TSBS_BINARY, properties.getProperty(WorkloadApiProperties.WORKLOAD_DATA_FOLDER), workload.getDbms(), workload.getTsbsLoadExecution(),workload.getDbEndpoints());
loadCommand += " > " + properties.getProperty(WorkloadApiProperties.WORKLOAD_RESULTS_FOLDER) + TSBS_RESULT_FOLDER + taskId.trim() + "_load.txt";
......@@ -327,6 +336,10 @@ public class WorkloadApiServiceImpl extends WorkloadApiService {
}
}
LOGGER.debug("Phase 3: load phase finished with exit code: " + processLoad.exitValue());
if(processLoad.exitValue() != 0){
return Response.serverError().entity("Generate queries phase failed!").build();
}
//Phase 4: executing query phase
......
package de.uulm.omi.workload.tsbs;
/**
* Created by Daniel Seybold on 11.05.2020.
*/
public enum Phase {
LOAD,
QUERY
}
......@@ -175,7 +175,7 @@ public class TSBSCommandBuilder {
//DBMS specific parameters
return getDBMSConnection(dbms, dbEndpoints, generateLoadCommand,
tsbsLoadExecution.getDbmsPorperties());
tsbsLoadExecution.getDbmsPorperties(),Phase.LOAD);
}
......@@ -194,18 +194,22 @@ public class TSBSCommandBuilder {
//DBMS specific parameters
return getDBMSConnection(dbms, dbEndpoints, queriesCommand,
tsbsQueryExecution.getDbmsPorperties());
tsbsQueryExecution.getDbmsPorperties(), Phase.QUERY);
}
//TODO: we need to pass the current phase! load and query
private static String getDBMSConnection(DbmsEnum dbms, DBEndpoints dbEndpoints,
String queriesCommand, List<TSDBMSDBMSProperties> dbmsPorperties) {
String queriesCommand, List<TSDBMSDBMSProperties> dbmsProperties,
Phase query) {
if(dbms.equals(DbmsEnum.INFLUX)){
queriesCommand += influxCommand(dbmsPorperties, dbEndpoints);
queriesCommand += influxCommand(dbmsProperties, dbEndpoints);
}else if (dbms.equals(DbmsEnum.TIMESCALEDB)){
queriesCommand += timescaledbCommand(dbmsPorperties, dbEndpoints);
queriesCommand += timescaledbCommand(dbmsProperties, dbEndpoints,query);
}else if(dbms.equals(DbmsEnum.CASSANDRA)){
queriesCommand += cassandraCommand(dbmsPorperties,dbEndpoints);
queriesCommand += cassandraCommand(dbmsProperties,dbEndpoints,query);
}
else {
......@@ -263,14 +267,27 @@ public class TSBSCommandBuilder {
}
private static String timescaledbCommand(List<TSDBMSDBMSProperties> dbmsProperties,
DBEndpoints dbEndpoints){
DBEndpoints dbEndpoints, Phase query){
String timescaledbCommand = "";
//TODO: add support for multiple TimescaleDB endpoints as soon as Timescale relases cluster support
LOGGER.warn("Multiple TimescaleDB endpoints are currently not supported!");
//TSBS Timescale script requires for load --host with multiple params, for query --hosts
if(dbEndpoints.size() > 1){
LOGGER.warn("Multiple TimescaleDB endpoints are currently not supported, using only the first endpoint!");
}
String timescaledbEndpoints = dbEndpoints.get(0).getIpAddress() + " ";
timescaledbCommand += "--host " + timescaledbEndpoints;
switch (query) {
case LOAD:
timescaledbCommand += "--host " + timescaledbEndpoints;
break;
case QUERY:
timescaledbCommand += "--hosts " + timescaledbEndpoints;
break;
}
//port
boolean portCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("port"));
......@@ -335,48 +352,73 @@ public class TSBSCommandBuilder {
}
private static String cassandraCommand(List<TSDBMSDBMSProperties> dbmsProperties,
DBEndpoints dbEndpoints) {
DBEndpoints dbEndpoints, Phase query) {
String cassandraCommand = "";
StringJoiner cassandraHosts = new StringJoiner(",");
for (DBEndpoint dbEndpoint: dbEndpoints) {
cassandraHosts.add(dbEndpoint.getIpAddress()+ ":9042");
}
cassandraCommand += "--hosts " + cassandraHosts + " ";
//TSBS Cassandra script requires for load --hosts with multiple params, for query --host and aggregation plan is mandatory
switch (query) {
case LOAD:
StringJoiner cassandraHosts = new StringJoiner(",");
for (DBEndpoint dbEndpoint: dbEndpoints) {
cassandraHosts.add(dbEndpoint.getIpAddress()+ ":9042");
}
cassandraCommand += "--hosts " + cassandraHosts + " ";
//replication-factor
boolean replicationFactorCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("replication-factor"));
if(replicationFactorCheck){
Optional<TSDBMSDBMSProperties> item = dbmsProperties.stream()
.filter(c -> c.getName().equals("replication-factor")).findAny();
//replication-factor
boolean replicationFactorCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("replication-factor"));
if(replicationFactorCheck){
Optional<TSDBMSDBMSProperties> item = dbmsProperties.stream()
.filter(c -> c.getName().equals("replication-factor")).findAny();
cassandraCommand += "--replication-factor " + item.get().getValue() + " ";
cassandraCommand += "--replication-factor " + item.get().getValue() + " ";
}else {
//falling back to default replication factor 1
LOGGER.warn("No replication-factor parameter provided, falling back to default 1!");
cassandraCommand += "--replication-factor 1 ";
}
}else {
//falling back to default replication factor 1
LOGGER.warn("No replication-factor parameter provided, falling back to default 1!");
cassandraCommand += "--replication-factor 1 ";
}
//write-consistency
boolean writeConsistencyFactorCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("consistency"));
if(writeConsistencyFactorCheck){
Optional<TSDBMSDBMSProperties> item = dbmsProperties.stream()
.filter(c -> c.getName().equals("consistency")).findAny();
//write-consistency
boolean writeConsistencyFactorCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("consistency"));
if(writeConsistencyFactorCheck){
Optional<TSDBMSDBMSProperties> item = dbmsProperties.stream()
.filter(c -> c.getName().equals("consistency")).findAny();
cassandraCommand += "--consistency " + item.get().getValue() + " ";
cassandraCommand += "--consistency " + item.get().getValue() + " ";
}else {
//falling back to default replication factor 1
LOGGER.warn("No consistency parameter provided, falling back to default ONE!");
cassandraCommand += "--consistency ONE ";
}else {
//falling back to default replication factor 1
LOGGER.warn("No consistency parameter provided, falling back to default ONE!");
cassandraCommand += "--consistency ONE ";
}
break;
case QUERY:
cassandraCommand += "--host " + dbEndpoints.get(0).getIpAddress() + ":9042 ";
//aggregation plan
boolean aggregationCheck = dbmsProperties.stream().anyMatch(o -> o.getName().equals("aggregation-plan"));
if(aggregationCheck){
Optional<TSDBMSDBMSProperties> item = dbmsProperties.stream()
.filter(c -> c.getName().equals("aggregation-plan")).findAny();
cassandraCommand += "--aggregation-plan " + item.get().getValue() + " ";
}else {
//falling back to default aggregation-plan: server
LOGGER.warn("No aggregation-plan parameter provided, falling back to default server!");
cassandraCommand += "--aggregation-plan server ";
}
break;
}
//TODO: put here common parameters for load/query phase
return cassandraCommand;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment