Enhanced Json input filter to have logSource which defaults to the filename

This commit is contained in:
Eric Fontana
2014-12-17 12:40:34 -05:00
parent 9d25a8302b
commit f4f8f0b50b
6 changed files with 134 additions and 114 deletions

View File

@@ -10,7 +10,7 @@ using System.Runtime.InteropServices;
[assembly: AssemblyConfiguration("")] [assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")] [assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("TimberWinR.ServiceHost")] [assembly: AssemblyProduct("TimberWinR.ServiceHost")]
[assembly: AssemblyCopyright("Copyright © 2014")] [assembly: AssemblyCopyright("Copyright © 2014-2015")]
[assembly: AssemblyTrademark("")] [assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")] [assembly: AssemblyCulture("")]

View File

@@ -79,8 +79,7 @@ namespace TimberWinR.Inputs
iFmt.minDateMod = _arguments.MinDateMod.Value.ToString("yyyy-MM-dd hh:mm:ss"); iFmt.minDateMod = _arguments.MinDateMod.Value.ToString("yyyy-MM-dd hh:mm:ss");
Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>(); Dictionary<string, Int64> logFileMaxRecords = new Dictionary<string, Int64>();
// Execute the query // Execute the query
while (!CancelToken.IsCancellationRequested) while (!CancelToken.IsCancellationRequested)
{ {

View File

@@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net.Configuration;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
@@ -30,11 +31,14 @@ namespace TimberWinR.Inputs
private Dictionary<string, DateTime> _logFileCreationTimes; private Dictionary<string, DateTime> _logFileCreationTimes;
private Dictionary<string, DateTime> _logFileSampleTimes; private Dictionary<string, DateTime> _logFileSampleTimes;
private Dictionary<string, long> _logFileSizes; private Dictionary<string, long> _logFileSizes;
private CancellationToken _cancelToken;
public bool Stop { get; set; }
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken) public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken)
: base(cancelToken, "Win32-FileLog") : base(cancelToken, "Win32-FileLog")
{ {
Stop = false;
_cancelToken = cancelToken;
_logFileMaxRecords = new Dictionary<string, Int64>(); _logFileMaxRecords = new Dictionary<string, Int64>();
_logFileCreationTimes = new Dictionary<string, DateTime>(); _logFileCreationTimes = new Dictionary<string, DateTime>();
_logFileSampleTimes = new Dictionary<string, DateTime>(); _logFileSampleTimes = new Dictionary<string, DateTime>();
@@ -54,6 +58,7 @@ namespace TimberWinR.Inputs
public override void Shutdown() public override void Shutdown()
{ {
LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType);
Stop = true;
base.Shutdown(); base.Shutdown();
} }
@@ -69,20 +74,20 @@ namespace TimberWinR.Inputs
new JProperty("splitLongLines", _arguments.SplitLongLines), new JProperty("splitLongLines", _arguments.SplitLongLines),
new JProperty("recurse", _arguments.Recurse), new JProperty("recurse", _arguments.Recurse),
new JProperty("files", new JProperty("files",
new JArray(from f in _logFileMaxRecords.Keys new JArray(from f in _logFileMaxRecords.Keys
select new JValue(f))), select new JValue(f))),
new JProperty("fileSampleTimes", new JProperty("fileSampleTimes",
new JArray(from f in _logFileSampleTimes.Values new JArray(from f in _logFileSampleTimes.Values
select new JValue(f))), select new JValue(f))),
new JProperty("fileSizes", new JProperty("fileSizes",
new JArray(from f in _logFileSizes.Values new JArray(from f in _logFileSizes.Values
select new JValue(f))), select new JValue(f))),
new JProperty("fileIndices", new JProperty("fileIndices",
new JArray(from f in _logFileMaxRecords.Values new JArray(from f in _logFileMaxRecords.Values
select new JValue(f))), select new JValue(f))),
new JProperty("fileCreationDates", new JProperty("fileCreationDates",
new JArray(from f in _logFileCreationTimes.Values new JArray(from f in _logFileCreationTimes.Values
select new JValue(f))) select new JValue(f)))
))); )));
return json; return json;
@@ -97,122 +102,135 @@ namespace TimberWinR.Inputs
recurse = _arguments.Recurse recurse = _arguments.Recurse
}; };
using (var syncHandle = new ManualResetEventSlim())
// Execute the query
while (!CancelToken.IsCancellationRequested)
{ {
var oLogQuery = new LogQuery(); // Execute the query
try while (!Stop)
{ {
Thread.CurrentThread.Priority = ThreadPriority.BelowNormal; var oLogQuery = new LogQuery();
if (!_cancelToken.IsCancellationRequested)
var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", fileToWatch);
var rsfiles = oLogQuery.Execute(qfiles, iFmt);
for (; !rsfiles.atEnd(); rsfiles.moveNext())
{ {
var record = rsfiles.getRecord(); try
string logName = record.getValue("LogFilename") as string;
FileInfo fi = new FileInfo(logName);
if (!fi.Exists)
{ {
_logFileCreationTimes.Remove(logName); syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), _cancelToken);
_logFileMaxRecords.Remove(logName); var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", fileToWatch);
_logFileSizes.Remove(logName); var rsfiles = oLogQuery.Execute(qfiles, iFmt);
} for (; !rsfiles.atEnd(); rsfiles.moveNext())
_logFileSampleTimes[logName] = DateTime.UtcNow;
DateTime creationTime = fi.CreationTimeUtc;
bool logHasRolled = (_logFileCreationTimes.ContainsKey(logName) && creationTime > _logFileCreationTimes[logName]) ||
(_logFileSizes.ContainsKey(logName) && fi.Length < _logFileSizes[logName]);
if (!_logFileMaxRecords.ContainsKey(logName) || logHasRolled)
{
_logFileCreationTimes[logName] = creationTime;
_logFileSizes[logName] = fi.Length;
var qcount = string.Format("SELECT max(Index) as MaxRecordNumber FROM {0}", logName);
var rcount = oLogQuery.Execute(qcount, iFmt);
var qr = rcount.getRecord();
var lrn = (Int64)qr.getValueEx("MaxRecordNumber");
if (logHasRolled)
{ {
LogManager.GetCurrentClassLogger().Info("Log {0} has rolled", logName); var record = rsfiles.getRecord();
lrn = 0; string logName = record.getValue("LogFilename") as string;
} FileInfo fi = new FileInfo(logName);
_logFileMaxRecords[logName] = lrn;
}
_logFileSizes[logName] = fi.Length; if (!fi.Exists)
}
rsfiles.close();
foreach (string fileName in _logFileMaxRecords.Keys.ToList())
{
var lastRecordNumber = _logFileMaxRecords[fileName];
var query = string.Format("SELECT * FROM {0} where Index > {1}", fileName, lastRecordNumber);
var rs = oLogQuery.Execute(query, iFmt);
Dictionary<string, int> colMap = new Dictionary<string, int>();
for (int col = 0; col < rs.getColumnCount(); col++)
{
string colName = rs.getColumnName(col);
colMap[colName] = col;
}
// Browse the recordset
for (; !rs.atEnd(); rs.moveNext())
{
var record = rs.getRecord();
var json = new JObject();
foreach (var field in _arguments.Fields)
{
if (!colMap.ContainsKey(field.Name))
continue;
object v = record.getValue(field.Name);
if (field.DataType == typeof(DateTime))
{ {
DateTime dt = DateTime.Parse(v.ToString()); _logFileCreationTimes.Remove(logName);
json.Add(new JProperty(field.Name, dt)); _logFileMaxRecords.Remove(logName);
_logFileSizes.Remove(logName);
} }
else
json.Add(new JProperty(field.Name, v));
}
string msg = json["Text"].ToString();
if (!string.IsNullOrEmpty(msg))
{
ProcessJson(json);
_receivedMessages++;
}
var lrn = (Int64)record.getValueEx("Index"); _logFileSampleTimes[logName] = DateTime.UtcNow;
_logFileMaxRecords[fileName] = lrn;
GC.Collect(); DateTime creationTime = fi.CreationTimeUtc;
bool logHasRolled = (_logFileCreationTimes.ContainsKey(logName) &&
creationTime > _logFileCreationTimes[logName]) ||
(_logFileSizes.ContainsKey(logName) &&
fi.Length < _logFileSizes[logName]);
if (!_logFileMaxRecords.ContainsKey(logName) || logHasRolled)
{
_logFileCreationTimes[logName] = creationTime;
_logFileSizes[logName] = fi.Length;
var qcount = string.Format("SELECT max(Index) as MaxRecordNumber FROM {0}", logName);
var rcount = oLogQuery.Execute(qcount, iFmt);
var qr = rcount.getRecord();
var lrn = (Int64) qr.getValueEx("MaxRecordNumber");
if (logHasRolled)
{
LogManager.GetCurrentClassLogger().Info("Log {0} has rolled", logName);
lrn = 0;
}
_logFileMaxRecords[logName] = lrn;
}
_logFileSizes[logName] = fi.Length;
}
rsfiles.close();
foreach (string fileName in _logFileMaxRecords.Keys.ToList())
{
var lastRecordNumber = _logFileMaxRecords[fileName];
var query = string.Format("SELECT * FROM {0} where Index > {1}", fileName,
lastRecordNumber);
var rs = oLogQuery.Execute(query, iFmt);
Dictionary<string, int> colMap = new Dictionary<string, int>();
for (int col = 0; col < rs.getColumnCount(); col++)
{
string colName = rs.getColumnName(col);
colMap[colName] = col;
}
// Browse the recordset
for (; !rs.atEnd(); rs.moveNext())
{
var record = rs.getRecord();
var json = new JObject();
foreach (var field in _arguments.Fields)
{
if (!colMap.ContainsKey(field.Name))
continue;
if (json["logSource"] == null)
{
if (string.IsNullOrEmpty(_arguments.LogSource))
json.Add(new JProperty("logSource", fileName));
else
json.Add(new JProperty("logSource", _arguments.LogSource));
}
object v = record.getValue(field.Name);
if (field.DataType == typeof (DateTime))
{
DateTime dt = DateTime.Parse(v.ToString());
json.Add(new JProperty(field.Name, dt));
}
else
json.Add(new JProperty(field.Name, v));
}
string msg = json["Text"].ToString();
if (!string.IsNullOrEmpty(msg))
{
ProcessJson(json);
_receivedMessages++;
}
var lrn = (Int64) record.getValueEx("Index");
_logFileMaxRecords[fileName] = lrn;
GC.Collect();
}
colMap.Clear();
// Close the recordset
rs.close();
rs = null;
GC.Collect();
}
} }
colMap.Clear(); catch (Exception ex)
// Close the recordset {
rs.close(); LogManager.GetCurrentClassLogger().Error(ex);
rs = null; }
GC.Collect(); finally
{
oLogQuery = null;
}
} }
} }
catch (Exception ex) Finished();
{
LogManager.GetCurrentClassLogger().Error(ex);
}
finally
{
oLogQuery = null;
}
Thread.CurrentThread.Priority = ThreadPriority.Normal;
System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000);
} }
Finished();
} }
} }
} }

View File

@@ -115,7 +115,7 @@ namespace TimberWinR.Inputs
{ {
LogManager.GetCurrentClassLogger().Warn(ex); LogManager.GetCurrentClassLogger().Warn(ex);
} }
} }
} }
} }

View File

@@ -6,6 +6,7 @@ using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Newtonsoft.Json.Linq; using Newtonsoft.Json.Linq;
using NLog; using NLog;
using RapidRegex.Core;
using RestSharp; using RestSharp;
namespace TimberWinR.Outputs namespace TimberWinR.Outputs
@@ -105,7 +106,7 @@ namespace TimberWinR.Outputs
{ {
var typeName = this.eo.GetTypeName(json); var typeName = this.eo.GetTypeName(json);
var indexName = this.eo.GetIndexName(json); var indexName = this.eo.GetIndexName(json);
var req = new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), Method.POST); var req = new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), Method.POST);
req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody); req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody);

View File

@@ -275,6 +275,8 @@ namespace TimberWinR.Parser
public List<Field> Fields { get; set; } public List<Field> Fields { get; set; }
[JsonProperty(PropertyName = "interval")] [JsonProperty(PropertyName = "interval")]
public int Interval { get; set; } public int Interval { get; set; }
[JsonProperty(PropertyName = "logSource")]
public string LogSource { get; set; }
public Log() public Log()
{ {