Commit ec2c50e2 authored by Daniel Seybold's avatar Daniel Seybold

added ssh connections to get total record count

parent 67940376
Pipeline #54157 passed with stage
in 18 minutes and 33 seconds
......@@ -9,7 +9,7 @@ applicationCatalogue=http://134.60.244.56:9091/v1
# Workload-API
#workloadApi=http://134.60.47.80:8181/v1
#workloadApi=http://localhost:8181/v1
workloadApi=http://134.60.64.149:8181/v1
workloadApi=http://134.60.64.88:8181/v1
# InfluxDB
......
......@@ -206,6 +206,14 @@
<version>1.0.0</version>
</dependency>
<!-- SSH Connectios -->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
<!-- Bridge for java util Logging to get Cloudiator Client logging -->
<dependency>
<groupId>org.slf4j</groupId>
......
......@@ -48,7 +48,6 @@ public class ScenarioApiServiceImpl extends ScenarioApiService {
SecurityContext securityContext) throws NotFoundException {
NoiseyNeighbour noiseyNeighbour = new NoiseyNeighbour(scenarioName, runs, scenarioSpec, cleanUp.booleanValue());
noiseyNeighbour.execute();
return Response.ok().entity(new ApiResponseMessage(ApiResponseMessage.OK, "Evaluation started!")).build();
......
......@@ -20,6 +20,7 @@ import de.uulm.omi.evaluation.task.InitRun;
import de.uulm.omi.evaluation.task.LogTask;
import de.uulm.omi.evaluation.task.Plotting;
import de.uulm.omi.evaluation.task.Plotting.PlottingTask;
import de.uulm.omi.evaluation.task.RemoteCommand;
import de.uulm.omi.evaluation.task.TaskState;
import de.uulm.omi.evaluation.task.TaskType;
import de.uulm.omi.evaluation.task.YcsbWorkload;
......@@ -115,6 +116,9 @@ public class AvailabilityYCSBMultiPhaseScenario extends EvaluationScenario {
GibbonV2 gibbonV2 = new GibbonV2(this.evaluationContext,this.dbmsClusterContext,this.availabilityYCSBMultiPhase.getGibbonSpec());
RemoteCommand remoteCommand = new RemoteCommand(this.evaluationContext,this.dbmsClusterContext,this.availabilityYCSBMultiPhase.getDbmsCluster().getType().name().toLowerCase());
/**
* Plot the timeseries of the current run
*/
......@@ -161,6 +165,7 @@ public class AvailabilityYCSBMultiPhaseScenario extends EvaluationScenario {
.build()
)
.execute(fetchYcsbResultTransaction)
.execute(remoteCommand)
.execute(mergeYCSB)
.execute(evaluationTimeseries)
.execute(workloadTimeseries)
......
......@@ -20,6 +20,7 @@ import de.uulm.omi.evaluation.task.InitRun;
import de.uulm.omi.evaluation.task.LogTask;
import de.uulm.omi.evaluation.task.Plotting;
import de.uulm.omi.evaluation.task.Plotting.PlottingTask;
import de.uulm.omi.evaluation.task.RemoteCommand;
import de.uulm.omi.evaluation.task.TaskState;
import de.uulm.omi.evaluation.task.TaskType;
import de.uulm.omi.evaluation.task.YcsbWorkload;
......@@ -123,6 +124,7 @@ public class AvailabilityYCSBWriteScenario extends EvaluationScenario {
GibbonV2 gibbonV2 = new GibbonV2(this.evaluationContext,this.dbmsClusterContext,this.availabilityYCSBWrite.getGibbonSpec());
RemoteCommand remoteCommand = new RemoteCommand(this.evaluationContext,this.dbmsClusterContext,this.availabilityYCSBWrite.getDbmsCluster().getType().name().toLowerCase());
/**
* Fetch the workload results for the current run and store to disk
......@@ -181,6 +183,7 @@ public class AvailabilityYCSBWriteScenario extends EvaluationScenario {
.build()
)
.execute(fetchYcsbResult)
.execute(remoteCommand)
.execute(mergeLoadYCSB)
.execute(evaluationTimeseries)
.execute(workloadTimeseries)
......
......@@ -21,6 +21,7 @@ import de.uulm.omi.evaluation.task.InitEvaluation;
import de.uulm.omi.evaluation.task.InitRun;
import de.uulm.omi.evaluation.task.Plotting;
import de.uulm.omi.evaluation.task.Plotting.PlottingTask;
import de.uulm.omi.evaluation.task.RemoteCommand;
import de.uulm.omi.evaluation.task.YcsbWorkload;
import de.uulm.omi.utils.YcsbWorkloadType;
import org.jeasy.flows.engine.WorkFlowEngine;
......@@ -70,8 +71,6 @@ public class PerformanceYCSBWriteScenario extends EvaluationScenario {
.getWorkloadInstances());
YcsbWorkload ycsbWorkload = new YcsbWorkload(this.evaluationContext, this.dbmsClusterContext,
this.performanceYCSBWrite.getWorkload(), this.performanceYCSBWrite.getWorkloadNetwork(), this.workloadPhaseContext,
YcsbWorkloadType.LOAD, performanceYCSBWrite.getWorkloadInstances());
......
......@@ -19,6 +19,7 @@ import de.uulm.omi.evaluation.task.InitEvaluation;
import de.uulm.omi.evaluation.task.InitRun;
import de.uulm.omi.evaluation.task.Plotting;
import de.uulm.omi.evaluation.task.Plotting.PlottingTask;
import de.uulm.omi.evaluation.task.RemoteCommand;
import de.uulm.omi.evaluation.task.YcsbWorkload;
import de.uulm.omi.utils.YcsbWorkloadType;
import org.jeasy.flows.engine.WorkFlowEngine;
......@@ -79,6 +80,9 @@ public class ScalabilityYCSBWriteScenario extends EvaluationScenario {
CleanUpFailedResources cleanUpFailedResources = new CleanUpFailedResources(this.evaluationContext, this.dbmsClusterContext, this.cleanUp);
RemoteCommand remoteCommand = new RemoteCommand(this.evaluationContext,this.dbmsClusterContext,this.scalabilityYCSBWrite.getDbmsCluster().getType().name().toLowerCase());
/**
* Plot the timeseries of the current run
*/
......@@ -106,6 +110,7 @@ public class ScalabilityYCSBWriteScenario extends EvaluationScenario {
.execute(ycsbWorkload)
.execute(checkWorkload)
.execute(fetchYcsbResult)
.execute(remoteCommand)
.execute(mergeYCSB)
.execute(evaluationTimeseries)
.execute(workloadTimeseries)
......
package de.uulm.omi.evaluation.task;
import com.google.common.io.CharStreams;
import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.JSch;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import de.uniulm.omi.cloudiator.colosseum.client.Client;
import de.uniulm.omi.cloudiator.colosseum.client.entities.KeyPair;
import de.uulm.omi.cloudiator.dbms.evaluator.model.ClusterContext;
import de.uulm.omi.cloudiator.dbms.evaluator.model.EvaluationContext;
import de.uulm.omi.utils.ColosseumClientHelper;
import de.uulm.omi.utils.ColosseumUtils;
import de.uulm.omi.utils.DBMSNodeType;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import org.jeasy.flows.work.DefaultWorkReport;
import org.jeasy.flows.work.WorkReport;
import org.jeasy.flows.work.WorkStatus;
/**
* Created by Daniel Seybold on 19.08.2019.
*/
public class RemoteCommand extends EvaluationTask {
private final Client colosseumClient;
private String loggerPrefix;
private final String DBMSType;
private final ClusterContext clusterContext;
public RemoteCommand(EvaluationContext evaluationContext, ClusterContext dbmsClusterContext, String DBMSType) {
super(TaskType.REMOTE_COMMAND, evaluationContext);
this.colosseumClient = ColosseumClientHelper.getColosseumClient();
this.clusterContext = dbmsClusterContext;
this.DBMSType = DBMSType;
}
@Override
public WorkReport call() {
this.loggerPrefix = "RUN " + this.evaluationContext.getRunCounter().intValue() +": ";
Long appInstanceId = Long.valueOf(this.clusterContext.getIdApplicationInstance());
List<String> endpoints = ColosseumUtils
.getEndpoints(appInstanceId, "PUBLIC", DBMSNodeType.SEED);
if (endpoints.isEmpty() || endpoints.size()>1){
LOGGER.error( this.loggerPrefix + "Invalid number of Seed endpoints: " + endpoints.size());
return new DefaultWorkReport(WorkStatus.FAILED);
}
String seedEnpoint = endpoints.get(0);
String command = null;
if(this.DBMSType.toLowerCase().equals("cassandra")){
command = getCassandraCommand();
}else if(this.DBMSType.toLowerCase().equals("couchbase")){
command = getCouchbaseCommand();
}else {
LOGGER.error(this.loggerPrefix + "Unsupported command target: " + this.DBMSType);
return new DefaultWorkReport(WorkStatus.FAILED);
}
/**
* get correct key pair
*/
//TODO: derive idCloud dynamically depending on application
long idCloud = 1;
String privateKeyPlain = null;
String publicKeyPlain = null;
List<KeyPair> keyPairs = colosseumClient.controller(KeyPair.class).getList();
for(KeyPair keyPair : keyPairs){
if(keyPair.getCloud().equals(idCloud)){
LOGGER.debug(this.loggerPrefix + "Found matching keypair for cloud with id: " + keyPair.getId());
privateKeyPlain = keyPair.getPrivateKey();
publicKeyPlain = keyPair.getPublicKey();
}
}
/**
* SSH connection part
*/
//TODO: refactor into a generic ssh command helper tool and make this task the getNumberOfRecords task
JSch jsch = new JSch();
String user = "ubuntu";
//String host = "134.60.64.116";
int port = 22;
//String privateKey = ".ssh/id_rsa";
try {
jsch.addIdentity("test_rsa", privateKeyPlain.getBytes(), publicKeyPlain.getBytes(), "secret".getBytes());
Session session = jsch.getSession(user, seedEnpoint, port);
session.setConfig("StrictHostKeyChecking", "no");
LOGGER.debug("session created.");
session.connect();
ChannelExec channel = (ChannelExec) session.openChannel("exec");
channel.setInputStream(null);
InputStream output = channel.getInputStream();
channel.setErrStream(System.err);
channel.setCommand(command);
channel.connect();
String result = CharStreams.toString(new InputStreamReader(output));
//printOutput(channel);
channel.disconnect();
LOGGER.debug(this.loggerPrefix + "The remote result command is: " + result);
/**
* Log the total number of records
*/
logNumberOfRecords(result);
} catch (JSchException e) {
LOGGER.error(e.getStackTrace().toString());
return new DefaultWorkReport(WorkStatus.FAILED);
} catch (IOException e) {
LOGGER.error(e.getStackTrace().toString());
return new DefaultWorkReport(WorkStatus.FAILED);
}
return new DefaultWorkReport(WorkStatus.COMPLETED);
}
private void logNumberOfRecords(String numberOfRecords){
long currentTimestamp = System.currentTimeMillis()/1000;
long eventTimestamp = currentTimestamp - this.evaluationContext.getStartTime();
String taskLogMessage = new SimpleDateFormat("dd.MM.yyyy-HH:mm:ss").format(new Date()) +"; " +eventTimestamp + "s; " + "NUMBER_OF_RECORDS" + "; " + numberOfRecords + "\n";
String taskLogUri = this.evaluationContext.getEvluationRun()
+ File.separator + EvaluationTask.TASK_LOG_FOLDER
+ File.separator + EvaluationTask.TASK_LOG;
try {
Files.write( Paths.get(taskLogUri), taskLogMessage.getBytes(), StandardOpenOption.APPEND);
} catch (IOException e) {
LOGGER.error("Error while writing TaskLog!" , e);
}
}
private static String getCassandraCommand(){
String command = "source /etc/environment && sudo -E ./cassandra-count -host $LOCAL_ADDRESS -keyspace ycsb -table usertable";
//"./cassandra/bin/nodetool tablestats -H ycsb.usertable | grep \"Number of partitions (estimate):\"";
//"./cassandra/bin/cqlsh -e \"SELECT COUNT(*) FROM ycsb.usertable;\" 192.168.100.9";
return command;
}
private static String getCouchbaseCommand(){
String commnad = "source /etc/environment && sudo -E curl -s http://$LOCAL_ADDRESS:8091/pools/default/buckets/ycsb/ -u ycsb:mowgli19 | jq '.basicStats | {itemCount: .itemCount }' | grep itemCount";
return commnad;
}
}
......@@ -26,6 +26,7 @@ public enum TaskType {
MOLPRO,
MISC,
SPAWN_VM,
RECOVERY
RECOVERY,
REMOTE_COMMAND
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment