using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver;
using MongoDB.Driver.GridFS;
namespace OpenTap.Plugins.PluginDevelopment.ResultListeners
{
[Display("MongoDB Result Listener", Group: "Database", Description: "Pushes results to a MongoDB instance.")]
public class MongoDbResultListener : ResultListener
{
[Display("Connection String", Group: "Connection", Order: 10.01, Description: "mongodb://127.0.0.1 | mongodb://mongodb0.example.com:27017")]
public string ConnectionString { get; set; }
[Display("Test Connection")]
[Browsable(true)]
public void TestConnection()
{
OnActivity();
Open_Impl();
Close();
}
private Dictionary<Guid, MongoTestStepRun> _testStepRuns;
private readonly Dictionary<string, ObjectId> _resultList = new Dictionary<string, ObjectId>();
private MongoTestPlanRun _testPlanRun;
private Db _db;
public MongoDbResultListener()
{
Name = "MongoDB";
ConnectionString = "mongodb://127.0.0.1:27017";
}
/// <summary>
/// Stores the TestPlanRun instance and writes the data to the database.
/// </summary>
/// <param name="planRun"></param>
public override void OnTestPlanRunStart(OpenTap.TestPlanRun planRun)
{
_testStepRuns = new Dictionary<Guid, MongoTestStepRun>();
var pDutId = planRun.Parameters.Where(x => x.MacroName != null && x.MacroName.Equals("DUT ID", StringComparison.CurrentCultureIgnoreCase));
var dutId = pDutId.Any() && string.IsNullOrWhiteSpace(pDutId.First().Value.ToString()) == false ? pDutId.First().Value.ToString().Trim() : "Default";
EngineSettings.Current.OperatorName = System.Security.Principal.WindowsIdentity.GetCurrent().Name;
EngineSettings.Current.StationName = Environment.MachineName;
_testPlanRun = new MongoTestPlanRun
{
_id = ObjectId.GenerateNewId(),
Guid = planRun.Id,
TestPlanName = planRun.TestPlanName,
StartTime = planRun.StartTime,
StartTimeStamp = planRun.StartTimeStamp,
TestStartTimezoneOffset = TimeZone.CurrentTimeZone.GetUtcOffset(planRun.StartTime),
Duration = new TimeSpan(),
TestPlanXml = planRun.TestPlanXml,
Verdict = planRun.Verdict.ToString(),
FailedToStart = planRun.FailedToStart,
Parameters2 = planRun.Parameters.ToDictionaryOfParameters(),
Parameters = planRun.Parameters.ToListOfParameters(),
DutId = dutId,
User = EngineSettings.Current.OperatorName,
Station = EngineSettings.Current.StationName
};
_db.AddTestPlanRun(_testPlanRun);
Log.Info($"Test plan '{planRun.TestPlanName}' started");
}
/// <summary>
/// Stores the TestStepRun instance and writes the data to the database.
/// </summary>
/// <param name="stepRun"></param>
public override void OnTestStepRunStart(TestStepRun stepRun)
{
base.OnTestStepRunStart(stepRun);
_resultList.Clear();
var newTestStepRun = new MongoTestStepRun
{
_id = ObjectId.GenerateNewId(),
TestPlanRunId = _testPlanRun._id,
TestPlanRunGuid = _testPlanRun.Guid,
Guid = stepRun.Id,
Parent = stepRun.Parent,
SuggestedNextStep = stepRun.SuggestedNextStep,
SupportsJumpTo = stepRun.SupportsJumpTo,
TestStepId = stepRun.TestStepId,
TestStepName = stepRun.TestStepName,
TestStepTypeName = stepRun.TestStepTypeName,
Duration = stepRun.Duration,
Parameters2 = stepRun.Parameters.ToDictionaryOfParameters(),
Parameters = stepRun.Parameters.ToListOfParameters(),
StartTime = stepRun.StartTime,
StartTimeStamp = stepRun.StartTimeStamp,
StartTimeZoneOffset = TimeZone.CurrentTimeZone.GetUtcOffset(stepRun.StartTime),
Verdict = stepRun.Verdict.ToString()
};
_testStepRuns.Add(stepRun.Id, newTestStepRun);
_db.AddTestStepRun(newTestStepRun);
}
/// <summary>
/// Writes the TestResult table to the database.
/// </summary>
/// <remarks>
/// Single row tables with multiple columns are scalar and treated as a new published result that
/// transforms all columns into parameters.
/// Calling Publish or PublishTable with multiple columns and rows defines the ResultTable structure.
/// Images, arrays and data tables are stored in GridFS blobs.
/// Calling PublishTable without first calling Publish will result in an error.
/// If a column is called PathToFile then the string value of the column is used to
/// get a file from the local machine to upload to the database.
/// If the only column is PathToFile then it is assumed to be an update to an existing result.
/// </remarks>
/// <param name="stepRun"></param>
/// <param name="result"></param>
public override void OnResultPublished(Guid stepRun, ResultTable result)
{
const string fileColumnName = "PathToFile";
const string fileTitleColumnName = "FileTitle";
try
{
OnActivity();
if (!_testStepRuns.TryGetValue(stepRun, out var testStepRun))
{
Log.Info($"Error storing test step run for {stepRun}.");
return;
}
var hasFile = result.Columns.Any(x => x.Name.Equals(fileColumnName));
var isArray = result.Rows > 1;
var resId = ObjectId.GenerateNewId();
if (_resultList.ContainsKey(result.Name))
_resultList[result.Name] = resId;
else
_resultList.Add(result.Name, resId);
if (isArray)
{
// Data is an array.
// The data will be stored as a blob in GridFS.
var dataTable = result.Columns.ToDictionary(column => column.Name, column => column.Data.Cast<object>().ToList());
var tableId = ObjectId.GenerateNewId();
_db.SaveDataTableAsync(tableId, resId, dataTable, null);
}
// If the results has an image.
if (hasFile)
{
var pathToFile = result.Columns.FirstOrDefault(x => x.Name.Equals(fileColumnName));
var filetTitle = result.Columns.FirstOrDefault(x => x.Name.Equals(fileTitleColumnName));
var imageId = ObjectId.GenerateNewId();
_db.SaveFileObjectAsync(imageId, resId, pathToFile?.GetValue<string>(0), filetTitle?.GetValue<string>(0));
}
// Data is scalar.
// Each column becomes a parameter and stored with the results.
// Do not store the path to file.
var additionalParameters = new ResultParameters();
foreach (var col in result.Columns)
{
if (!col.Name.Equals(fileColumnName) && !col.Name.Equals(fileTitleColumnName))
additionalParameters.Add(new ResultParameter("Result", col.Name, (IConvertible)col.Data.GetValue(0)));
}
// This is important as subsequent results published inside the same TestStepRun accumulate parameters.
var mongoParams = new MongoParameter[testStepRun.Parameters.Count];
testStepRun.Parameters.CopyTo(mongoParams);
var resultData = new MongoTestResult
{
_id = resId,
TestPlanRunId = testStepRun.TestPlanRunId,
TestPlanRunGuid = testStepRun.TestPlanRunGuid,
Guid = testStepRun.Guid,
Parent = testStepRun.Parent,
Name = result.Name,
Parameters = mongoParams.ToList(),
};
resultData.Parameters.AddRange(additionalParameters.ToListOfParameters());
resultData.Parameters2 = resultData.Parameters.ToDictionaryOfParameters();
_db.AddStepResult(resultData);
}
catch (Exception ex)
{
Log.Error($"Failed to store result for {Name}");
Log.Error($"{ex.Message}");
}
}
/// <summary>
/// Updates the TestStepRun after completion.
/// </summary>
/// <param name="stepRun"></param>
public override void OnTestStepRunCompleted(TestStepRun stepRun)
{
base.OnTestStepRunCompleted(stepRun);
if (!_testStepRuns.TryGetValue(stepRun.Id, out var testStepRun))
{
Log.Info($"Error updating test step run for {stepRun}.");
return;
}
testStepRun.Duration = stepRun.Duration;
testStepRun.Verdict = stepRun.Verdict.ToString();
_db.UpdateTestStepRun(testStepRun);
_testStepRuns.Remove(stepRun.Id);
}
/// <summary>
/// Updates the TestPlanRun after completion.
/// </summary>
/// <param name="planRun"></param>
/// <param name="logStream"></param>
public override void OnTestPlanRunCompleted(OpenTap.TestPlanRun planRun, Stream logStream)
{
_testPlanRun.Duration = planRun.Duration;
_testPlanRun.Verdict = planRun.Verdict.ToString();
_testPlanRun.FailedToStart = planRun.FailedToStart;
_testPlanRun.LogStreamId = ObjectId.GenerateNewId();
_db.UpdateTestPlanRun(_testPlanRun);
_db.SaveStreamAsync(_testPlanRun.LogStreamId, _testPlanRun._id, $"{_testPlanRun.DutId} {_testPlanRun.StartTime.ToString("yyyyMMddTHHmmss")}.txt", logStream);
}
public override void Open()
{
Open_Impl();
base.Open();
}
public bool Open_Impl()
{
string dbName = "Demo1";
_db = new Db();
var success = _db.Connect(ConnectionString, dbName);
if (success)
Log.Info($"{Name} connection OK.");
else
Log.Error($"{Name} connection FAILED.");
return success;
}
}
public class Db
{
private const string CollectionTestPlanRun = "TestPlanRuns";
private const string CollectionTestResults = "TestResults";
private const string CollectionTestStepRuns = "TestStepRuns";
private const string BucketImages = "Images";
private const string BucketTables = "Tables";
private const string BucketLogs = "Logs";
public IMongoClient Client { get; private set; }
public IMongoDatabase Database { get; private set; }
public TimeSpan ServerTimeout { get; set; } = new TimeSpan(0, 0, 0, 0, 30000);
/// <summary>
/// Connect to the serve.
/// </summary>
/// <param name="connectionString"></param>
/// <param name="databaseName"></param>
/// <returns></returns>
public bool Connect(string connectionString, string databaseName)
{
Client = new MongoClient(connectionString);
// All this does is create the database object in the API.
// It not create a database on the server. MondoDB will only create the database when
// there is information to write to it.
Database = Client.GetDatabase(databaseName);
// Grab a list databases to confirm we have connected.
var dbs = Client.ListDatabaseNames();
var dbNames = new List<string>();
dbs.ForEachAsync(x => dbNames.Add(x));
return dbNames.Count > 0;
}
/// <summary>
/// Insert TestPlanRun.
/// </summary>
/// <param name="testPlanRun"></param>
public async void AddTestPlanRun(MongoTestPlanRun testPlanRun)
{
await InsertOneAsync(Database.GetCollection<MongoTestPlanRun>(CollectionTestPlanRun), testPlanRun).ConfigureAwait(false);
}
/// <summary>
/// Insert TestStepRun
/// </summary>
/// <param name="testStepRun"></param>
public async void AddTestStepRun(MongoTestStepRun testStepRun)
{
await InsertOneAsync(Database.GetCollection<MongoTestStepRun>(CollectionTestStepRuns), testStepRun).ConfigureAwait(false);
}
/// <summary>
/// Insert TestResult.
/// </summary>
/// <param name="testResult"></param>
public async void AddStepResult(MongoTestResult testResult)
{
await InsertOneAsync(Database.GetCollection<MongoTestResult>(CollectionTestResults), testResult).ConfigureAwait(false);
}
/// <summary>
/// Insert a one document.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <param name="data"></param>
/// <returns></returns>
private async Task InsertOneAsync<T>(IMongoCollection<T> entity, T data)
{
try
{
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
await entity.InsertOneAsync(data, null, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
}
public async void UpdateTestStepRun(MongoTestStepRun testStepRun)
{
var builder = Builders<MongoTestStepRun>.Filter;
var filter = builder.Eq(x => x._id, testStepRun._id);
var update = Builders<MongoTestStepRun>.Update
.Set(x => x.Parameters2, testStepRun.Parameters2)
.Set(x => x.Duration, testStepRun.Duration)
.Set(x => x.Verdict, testStepRun.Verdict);
try
{
var entity = Database.GetCollection<MongoTestStepRun>(CollectionTestStepRuns);
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
var updateResult = await entity.UpdateOneAsync(filter, update, null, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
}
/// <summary>
/// Update the Test Plan Run.
/// </summary>
/// <param name="testRun"></param>
public async void UpdateTestPlanRun(MongoTestPlanRun testRun)
{
var builder = Builders<MongoTestPlanRun>.Filter;
var filter = builder.Eq(x => x._id, testRun._id);
var update = Builders<MongoTestPlanRun>.Update
.Set(x => x.Duration, testRun.Duration)
.Set(x => x.Verdict, testRun.Verdict)
.Set(x => x.FailedToStart, testRun.FailedToStart)
.Set(x => x.LogStreamId, testRun.LogStreamId);
try
{
var entity = Database.GetCollection<MongoTestPlanRun>(CollectionTestPlanRun);
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
await entity.UpdateOneAsync(filter, update, null, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
}
/// <summary>
/// Save an image file to GridFS using the path to the file.
/// </summary>
/// <param name="gfsId"></param>
/// <param name="resultId"></param>
/// <param name="filePath"></param>
/// <param name="title"></param>
/// <param name="type"></param>
public async void SaveFileObjectAsync(ObjectId gfsId, ObjectId resultId, string filePath, string title, string type = "Image")
{
if (string.IsNullOrWhiteSpace(filePath)) return;
if (!File.Exists(filePath)) return;
if (string.IsNullOrWhiteSpace(title))
title = "Not Set";
var fn = Path.GetFileName(filePath);
var metaData = new Dictionary<string, object>
{
{ "ResultId", resultId },
{ "Title", title },
{ "DataType", type }
};
var gridFsUploadOptions = new GridFSUploadOptions { Metadata = new BsonDocument(metaData) };
try
{
var bucket = new GridFSBucket(Database, new GridFSBucketOptions
{
BucketName = BucketImages,
});
var source = File.ReadAllBytes(filePath);
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
await bucket.UploadFromBytesAsync(gfsId, fn, source, gridFsUploadOptions, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
}
/// <summary>
/// Stores array data as a byte in GridFs.
/// </summary>
/// <param name="gfsId"></param>
/// <param name="resultId"></param>
/// <param name="data"></param>
/// <param name="title"></param>
/// <param name="type"></param>
public async void SaveDataTableAsync(ObjectId gfsId, ObjectId resultId, Dictionary<string, List<object>> data, string title, string type = "DataTable")
{
if (data == null) return;
var bsonByte = data.ToBson();
if (bsonByte.LongLength == 0) return;
if (string.IsNullOrWhiteSpace(title))
title = "Not Set";
var metaData = new Dictionary<string, object>
{
{ "ResultId", resultId },
{ "Title", title },
{ "DataType", type }
};
var gridFsUploadOptions = new GridFSUploadOptions
{
Metadata = new BsonDocument(metaData)
};
try
{
var bucket = new GridFSBucket(Database, new GridFSBucketOptions
{
BucketName = BucketTables,
});
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
await bucket.UploadFromBytesAsync(gfsId, gfsId.ToString(), bsonByte, gridFsUploadOptions, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
catch (Exception)
{
// ignored
}
}
/// <summary>
/// Write the log to the database.
/// </summary>
/// <param name="gfsId"></param>
/// <param name="testPlanRunId"></param>
/// <param name="fn"></param>
/// <param name="logStream"></param>
/// <param name="type"></param>
public async void SaveStreamAsync(ObjectId gfsId, ObjectId testPlanRunId, string fn, Stream logStream, string type = "Stream")
{
if (logStream == null) return;
var metaData = new Dictionary<string, object>
{
{ "TestPlanRunId", testPlanRunId },
{ "DataType", type }
};
var gridFsUploadOptions = new GridFSUploadOptions { Metadata = new BsonDocument(metaData) };
try
{
var bucket = new GridFSBucket(Database, new GridFSBucketOptions
{
BucketName = BucketLogs,
});
using (var timeoutCancellationTokenSource = new CancellationTokenSource(ServerTimeout))
{
await bucket.UploadFromStreamAsync(gfsId, fn, logStream, gridFsUploadOptions, timeoutCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException ex)
{
}
catch (Exception ex)
{
// ignored
}
}
}
public class MongoTestPlanRun
{
public ObjectId _id { get; set; }
public Guid Guid { get; set; }
public string TestPlanName { get; set; }
public string TestPlanXml { get; set; }
public string User { get; set; }
public string Station { get; set; }
public DateTime StartTime { get; set; }
public long StartTimeStamp { get; set; }
public TimeSpan TestStartTimezoneOffset { get; set; }
public TimeSpan Duration { get; set; }
public string Verdict { get; set; }
public bool FailedToStart { get; set; }
public Dictionary<string, MongoParameter> Parameters2 { get; set; }
[BsonIgnore]
public List<MongoParameter> Parameters { get; set; }
public string DutId { get; set; }
public ObjectId LogStreamId { get; set; }
}
public class MongoTestStepRun
{
public ObjectId _id { get; set; }
public ObjectId TestPlanRunId { get; set; }
public Guid TestPlanRunGuid { get; set; }
public Guid Guid { get; set; }
public Guid Parent { get; set; }
public Guid? SuggestedNextStep { get; set; }
public bool SupportsJumpTo { get; set; }
public Guid TestStepId { get; set; }
public string TestStepName { get; set; }
public string TestStepTypeName { get; set; }
public DateTime StartTime { get; set; }
public long StartTimeStamp { get; set; }
public TimeSpan StartTimeZoneOffset { get; set; }
public TimeSpan Duration { get; set; }
public string Verdict { get; set; }
public Dictionary<string, MongoParameter> Parameters2 { get; set; }
[BsonIgnore]
public List<MongoParameter> Parameters { get; set; }
}
public class MongoTestResult
{
public ObjectId _id { get; set; }
public ObjectId TestPlanRunId { get; set; }
public Guid TestPlanRunGuid { get; set; }
public Guid Guid { get; set; }
public Guid Parent { get; set; }
public string Name { get; set; }
public Dictionary<string, MongoParameter> Parameters2 { get; set; }
[BsonIgnore]
public List<MongoParameter> Parameters { get; set; }
}
/// <summary>
/// Represents the OpenTAP Parameter class for MongoDB.
/// </summary>
[BsonDiscriminator("Parameter")]
public class MongoParameter
{
public string Name { get; set; }
public string Group { get; set; }
public int ParentLevel { get; set; }
public string MacroName { get; set; }
public bool IsMetaData { get; set; }
public dynamic Value { get; set; }
}
internal static class TapExtension
{
/// <summary>
/// Transform OpenTAP ResultsParameters in to a dictionary of parameters.
/// This will create a Parameters BSON document containing named parameter documents.
/// </summary>
/// <param name="tapParams"></param>
/// <returns></returns>
internal static Dictionary<string, MongoParameter> ToDictionaryOfParameters(this ResultParameters tapParams)
{
var mongoParams = new Dictionary<string, MongoParameter>();
var n = 1;
foreach (var rp in tapParams.OrderBy(x => x.Name))
{
var cleanedKey = $"{rp.Group?.Replace(".", "_")}{rp.Name.Replace(".", "_")}";
if (tapParams.Count(x => x.Name.Equals(rp.Name)) > 1)
cleanedKey += $"_{n++}";
else
n = 1;
if (mongoParams.ContainsKey(cleanedKey)) continue;
var newParam = new MongoParameter
{
Name = rp.Name,
Group = string.IsNullOrWhiteSpace(rp.Group) ? "System" : rp.Group,
Value = rp.Value,
};
mongoParams.Add(cleanedKey, newParam);
}
return mongoParams;
}
/// <summary>
/// Transform a list of MongoParameters in to a dictionary of parameters.
/// This will create a Parameters BSON document containing named parameter documents.
/// </summary>
/// <param name="tapParams"></param>
/// <returns></returns>
internal static Dictionary<string, MongoParameter> ToDictionaryOfParameters(this List<MongoParameter> tapParams)
{
var mongoParams = new Dictionary<string, MongoParameter>();
var n = 1;
foreach (var rp in tapParams.OrderBy(x => x.Name))
{
var cleanedKey = $"{rp.Group?.Replace(".", "_")}{rp.Name.Replace(".", "_")}";
if (tapParams.Count(x => x.Name.Equals(rp.Name)) > 1)
cleanedKey += $"_{n++}";
else
n = 1;
if (mongoParams.ContainsKey(cleanedKey)) continue;
var newParam = new MongoParameter
{
Name = rp.Name,
Group = string.IsNullOrWhiteSpace(rp.Group) ? "System" : rp.Group,
Value = rp.Value,
};
mongoParams.Add(cleanedKey, newParam);
}
return mongoParams;
}
/// <summary>
/// Transform OpenTAP ResultsParameters in to a list of parameters.
/// This will create a Parameters BSON document containing a list of parameter documents.
/// </summary>
/// <param name="tapParams"></param>
/// <returns></returns>
internal static List<MongoParameter> ToListOfParameters(this ResultParameters tapParams)
{
return tapParams.OrderBy(x => x.Name)
.Select(rp => new MongoParameter
{
Name = rp.Name,
Group = string.IsNullOrWhiteSpace(rp.Group) ? "System" : rp.Group,
Value = rp.Value,
//IsMetaData = rp.IsMetaData,
//ParentLevel = rp.ParentLevel
}).ToList();
}
}
}