Added Diagnostics Port

This commit is contained in:
Eric Fontana
2014-08-21 11:19:52 -04:00
parent 660bdb52dc
commit 4b1e75b604
15 changed files with 288 additions and 48 deletions

View File

@@ -38,7 +38,8 @@ namespace TimberWinR.ServiceHost
hostConfigurator.AddCommandLineDefinition("configFile", c => arguments.ConfigFile = c); hostConfigurator.AddCommandLineDefinition("configFile", c => arguments.ConfigFile = c);
hostConfigurator.AddCommandLineDefinition("logLevel", c => arguments.LogLevel = c); hostConfigurator.AddCommandLineDefinition("logLevel", c => arguments.LogLevel = c);
hostConfigurator.AddCommandLineDefinition("logDir", c => arguments.LogfileDir = c); hostConfigurator.AddCommandLineDefinition("logDir", c => arguments.LogfileDir = c);
hostConfigurator.AddCommandLineDefinition("diagnosticPort", c => arguments.DiagnosticPort = int.Parse(c));
hostConfigurator.ApplyCommandLine(); hostConfigurator.ApplyCommandLine();
hostConfigurator.RunAsLocalSystem(); hostConfigurator.RunAsLocalSystem();
@@ -59,6 +60,8 @@ namespace TimberWinR.ServiceHost
AddServiceParameter("-configFile", arguments.ConfigFile); AddServiceParameter("-configFile", arguments.ConfigFile);
AddServiceParameter("-logLevel", arguments.LogLevel); AddServiceParameter("-logLevel", arguments.LogLevel);
AddServiceParameter("-logDir", arguments.LogfileDir); AddServiceParameter("-logDir", arguments.LogfileDir);
if (arguments.DiagnosticPort > 0)
AddServiceParameter("-diagnosticPort", arguments.DiagnosticPort);
} }
}); });
}); });
@@ -77,6 +80,21 @@ namespace TimberWinR.ServiceHost
Registry.SetValue(keyPath, keyName, currentValue); Registry.SetValue(keyPath, keyName, currentValue);
} }
} }
private static void AddServiceParameter(string paramName, int value)
{
string keyPath = @"HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\services\TimberWinR";
string keyName = "ImagePath";
string currentValue = Registry.GetValue(keyPath, keyName, "").ToString();
if (!string.IsNullOrEmpty(paramName) && !currentValue.Contains(string.Format("{0}:", paramName)))
{
currentValue += string.Format(" {0}:{1}", paramName, value);
Registry.SetValue(keyPath, keyName, currentValue);
}
}
} }
internal class Arguments internal class Arguments
@@ -84,9 +102,11 @@ namespace TimberWinR.ServiceHost
public string ConfigFile { get; set; } public string ConfigFile { get; set; }
public string LogLevel { get; set; } public string LogLevel { get; set; }
public string LogfileDir { get; set; } public string LogfileDir { get; set; }
public int DiagnosticPort { get; set; }
public Arguments() public Arguments()
{ {
DiagnosticPort = 5141;
ConfigFile = "default.json"; ConfigFile = "default.json";
LogLevel = "Info"; LogLevel = "Info";
LogfileDir = @"C:\logs"; LogfileDir = @"C:\logs";
@@ -100,7 +120,7 @@ namespace TimberWinR.ServiceHost
readonly CancellationToken _cancellationToken; readonly CancellationToken _cancellationToken;
readonly Task _serviceTask; readonly Task _serviceTask;
private readonly Arguments _args; private readonly Arguments _args;
private TimberWinR.Diagnostics.Diagnostics _diags;
private TimberWinR.Manager _manager; private TimberWinR.Manager _manager;
public TimberWinRService(Arguments args) public TimberWinRService(Arguments args)
@@ -108,7 +128,7 @@ namespace TimberWinR.ServiceHost
_args = args; _args = args;
_cancellationTokenSource = new CancellationTokenSource(); _cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token; _cancellationToken = _cancellationTokenSource.Token;
_serviceTask = new Task(RunService, _cancellationToken); _serviceTask = new Task(RunService, _cancellationToken);
} }
public void Start() public void Start()
@@ -118,8 +138,10 @@ namespace TimberWinR.ServiceHost
public void Stop() public void Stop()
{ {
_cancellationTokenSource.Cancel(); _cancellationTokenSource.Cancel();
if (_diags != null)
_diags.Shutdown();
if (_manager != null) if (_manager != null)
_manager.Shutdown(); _manager.Shutdown();
} }
@@ -130,6 +152,8 @@ namespace TimberWinR.ServiceHost
private void RunService() private void RunService()
{ {
_manager = new TimberWinR.Manager(_args.ConfigFile, _args.LogLevel, _args.LogfileDir, _cancellationToken); _manager = new TimberWinR.Manager(_args.ConfigFile, _args.LogLevel, _args.LogfileDir, _cancellationToken);
if (_args.DiagnosticPort > 0)
_diags = new Diagnostics.Diagnostics(_manager, _cancellationToken, _args.DiagnosticPort);
} }
} }
} }

View File

@@ -104,6 +104,7 @@ namespace TimberWinR
{ {
if (!string.IsNullOrEmpty(jsonConfFile)) if (!string.IsNullOrEmpty(jsonConfFile))
{ {
LogManager.GetCurrentClassLogger().Info("Reading Configuration From {0}", jsonConfFile);
string json = File.ReadAllText(jsonConfFile); string json = File.ReadAllText(jsonConfFile);
return FromString(json, c); return FromString(json, c);

View File

@@ -22,12 +22,13 @@ namespace TimberWinR.Inputs
{ {
private int _pollingIntervalInSeconds = 1; private int _pollingIntervalInSeconds = 1;
private TimberWinR.Parser.IISW3CLog _arguments; private TimberWinR.Parser.IISW3CLog _arguments;
private long _receivedMessages;
public IISW3CInputListener(TimberWinR.Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 1) public IISW3CInputListener(TimberWinR.Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 1)
: base(cancelToken, "Win32-IISLog") : base(cancelToken, "Win32-IISLog")
{ {
_arguments = arguments; _arguments = arguments;
_receivedMessages = 0;
_pollingIntervalInSeconds = pollingIntervalInSeconds; _pollingIntervalInSeconds = pollingIntervalInSeconds;
var task = new Task(IISW3CWatcher, cancelToken); var task = new Task(IISW3CWatcher, cancelToken);
task.Start(); task.Start();
@@ -36,19 +37,37 @@ namespace TimberWinR.Inputs
public override void Shutdown() public override void Shutdown()
{ {
base.Shutdown(); base.Shutdown();
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("iisw3c",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("location", _arguments.Location),
new JProperty("codepage", _arguments.CodePage),
new JProperty("consolidateLogs", _arguments.ConsolidateLogs),
new JProperty("dirTime", _arguments.DirTime),
new JProperty("dQuotes", _arguments.DoubleQuotes),
new JProperty("recurse", _arguments.Recurse),
new JProperty("useDoubleQuotes", _arguments.DoubleQuotes)
)));
return json;
}
private void IISW3CWatcher() private void IISW3CWatcher()
{ {
var oLogQuery = new LogQuery(); var oLogQuery = new LogQuery();
var iFmt = new IISW3CLogInputFormat() var iFmt = new IISW3CLogInputFormat()
{ {
codepage = _arguments.CodePage, codepage = _arguments.CodePage,
consolidateLogs = _arguments.ConsolidateLogs, consolidateLogs = _arguments.ConsolidateLogs,
dirTime = _arguments.DirTime, dirTime = _arguments.DirTime,
dQuotes = _arguments.DoubleQuotes, dQuotes = _arguments.DoubleQuotes,
iCheckpoint = CheckpointFileName, iCheckpoint = CheckpointFileName,
recurse = _arguments.Recurse, recurse = _arguments.Recurse,
useDoubleQuotes = _arguments.DoubleQuotes useDoubleQuotes = _arguments.DoubleQuotes
}; };
@@ -67,7 +86,7 @@ namespace TimberWinR.Inputs
{ {
var rs = oLogQuery.Execute(query, iFmt); var rs = oLogQuery.Execute(query, iFmt);
Dictionary<string, int> colMap = new Dictionary<string, int>(); Dictionary<string, int> colMap = new Dictionary<string, int>();
for (int col=0; col<rs.getColumnCount(); col++) for (int col = 0; col < rs.getColumnCount(); col++)
{ {
string colName = rs.getColumnName(col); string colName = rs.getColumnName(col);
colMap[colName] = col; colMap[colName] = col;
@@ -86,16 +105,17 @@ namespace TimberWinR.Inputs
if (!colMap.ContainsKey(field.Name)) if (!colMap.ContainsKey(field.Name))
continue; continue;
object v = record.getValue(field.Name); object v = record.getValue(field.Name);
if (field.DataType == typeof(DateTime)) if (field.DataType == typeof(DateTime))
{ {
DateTime dt = DateTime.Parse(v.ToString()); DateTime dt = DateTime.Parse(v.ToString());
json.Add(new JProperty(field.Name, dt)); json.Add(new JProperty(field.Name, dt));
} }
else else
json.Add(new JProperty(field.Name, v)); json.Add(new JProperty(field.Name, v));
} }
ProcessJson(json); ProcessJson(json);
_receivedMessages++;
} }
} }
// Close the recordset // Close the recordset

View File

@@ -6,6 +6,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using NLog;
namespace TimberWinR.Inputs namespace TimberWinR.Inputs
{ {
@@ -17,7 +18,14 @@ namespace TimberWinR.Inputs
private string _typeName; private string _typeName;
public AutoResetEvent FinishedEvent { get; set; } public AutoResetEvent FinishedEvent { get; set; }
public string CheckpointFileName { get; set; } public string CheckpointFileName { get; set; }
public string InputType
{
get { return _typeName; }
}
public abstract JObject ToJson();
public InputListener(CancellationToken token, string typeName) public InputListener(CancellationToken token, string typeName)
{ {
CheckpointFileName = Path.Combine(System.IO.Path.GetTempPath(), string.Format("{0}.lpc", Guid.NewGuid().ToString())); CheckpointFileName = Path.Combine(System.IO.Path.GetTempPath(), string.Format("{0}.lpc", Guid.NewGuid().ToString()));
@@ -60,8 +68,9 @@ namespace TimberWinR.Inputs
if (File.Exists(CheckpointFileName)) if (File.Exists(CheckpointFileName))
File.Delete(CheckpointFileName); File.Delete(CheckpointFileName);
} }
catch (Exception) catch (Exception ex)
{ {
LogManager.GetCurrentClassLogger().Error("Error Deleting Checkpoint File", ex);
} }
} }

View File

@@ -24,10 +24,12 @@ namespace TimberWinR.Inputs
{ {
private int _pollingIntervalInSeconds; private int _pollingIntervalInSeconds;
private TimberWinR.Parser.Log _arguments; private TimberWinR.Parser.Log _arguments;
private long _receivedMessages;
public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 3) public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 3)
: base(cancelToken, "Win32-FileLog") : base(cancelToken, "Win32-FileLog")
{ {
_receivedMessages = 0;
_arguments = arguments; _arguments = arguments;
_pollingIntervalInSeconds = pollingIntervalInSeconds; _pollingIntervalInSeconds = pollingIntervalInSeconds;
var task = new Task(FileWatcher, cancelToken); var task = new Task(FileWatcher, cancelToken);
@@ -39,6 +41,20 @@ namespace TimberWinR.Inputs
base.Shutdown(); base.Shutdown();
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("log",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("location", _arguments.Location),
new JProperty("codepage", _arguments.CodePage),
new JProperty("splitLongLines", _arguments.SplitLongLines),
new JProperty("recurse", _arguments.Recurse)
)));
return json;
}
private void FileWatcher() private void FileWatcher()
{ {
var iFmt = new TextLineInputFormat() var iFmt = new TextLineInputFormat()
@@ -81,17 +97,20 @@ namespace TimberWinR.Inputs
continue; continue;
object v = record.getValue(field.Name); object v = record.getValue(field.Name);
if (field.DataType == typeof(DateTime)) if (field.DataType == typeof (DateTime))
{ {
DateTime dt = DateTime.Parse(v.ToString()); DateTime dt = DateTime.Parse(v.ToString());
json.Add(new JProperty(field.Name, dt)); json.Add(new JProperty(field.Name, dt));
} }
else else
json.Add(new JProperty(field.Name, v)); json.Add(new JProperty(field.Name, v));
} }
string msg = json["Text"].ToString(); string msg = json["Text"].ToString();
if (!string.IsNullOrEmpty(msg)) if (!string.IsNullOrEmpty(msg))
{
ProcessJson(json); ProcessJson(json);
_receivedMessages++;
}
} }
} }
// Close the recordset // Close the recordset

View File

@@ -20,6 +20,13 @@ namespace TimberWinR.Inputs
_listenThread.Start(); _listenThread.Start();
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("stdin", "enabled"));
return json;
}
public override void Shutdown() public override void Shutdown()
{ {
base.Shutdown(); base.Shutdown();

View File

@@ -18,12 +18,27 @@ namespace TimberWinR.Inputs
private Thread _listenThreadV4; private Thread _listenThreadV4;
private Thread _listenThreadV6; private Thread _listenThreadV6;
private readonly int _port; private readonly int _port;
private long _receivedMessages;
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("tcp",
new JObject(
new JProperty("port", _port),
new JProperty("messages", _receivedMessages)
)));
return json;
}
public TcpInputListener(CancellationToken cancelToken, int port = 5140) public TcpInputListener(CancellationToken cancelToken, int port = 5140)
: base(cancelToken, "Win32-Tcp") : base(cancelToken, "Win32-Tcp")
{ {
_port = port; _port = port;
LogManager.GetCurrentClassLogger().Info("Tcp Input on Port: {0}", _port);
LogManager.GetCurrentClassLogger().Info("Tcp Input(v4/v6) on Port {0} Ready", _port);
_tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port); _tcpListenerV6 = new System.Net.Sockets.TcpListener(IPAddress.IPv6Any, port);
_tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port); _tcpListenerV4 = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
@@ -52,8 +67,7 @@ namespace TimberWinR.Inputs
listener.Start(); listener.Start();
LogManager.GetCurrentClassLogger().Info("Tcp Input on Port {0} Ready", _port);
while (!CancelToken.IsCancellationRequested) while (!CancelToken.IsCancellationRequested)
{ {
try try
@@ -79,7 +93,7 @@ namespace TimberWinR.Inputs
{ {
var tcpClient = (TcpClient)client; var tcpClient = (TcpClient)client;
NetworkStream clientStream = null; NetworkStream clientStream = null;
try try
{ {
clientStream = tcpClient.GetStream(); clientStream = tcpClient.GetStream();
@@ -91,6 +105,7 @@ namespace TimberWinR.Inputs
{ {
JObject json = JObject.Parse(line); JObject json = JObject.Parse(line);
ProcessJson(json); ProcessJson(json);
_receivedMessages++;
} }
catch (Exception ex) catch (Exception ex)
{ {
@@ -104,7 +119,7 @@ namespace TimberWinR.Inputs
{ {
LogManager.GetCurrentClassLogger().Error("Tcp Exception", ex); LogManager.GetCurrentClassLogger().Error("Tcp Exception", ex);
} }
if (clientStream != null) if (clientStream != null)
clientStream.Close(); clientStream.Close();

View File

@@ -21,13 +21,14 @@ namespace TimberWinR.Inputs
/// Listen to Windows Event Log /// Listen to Windows Event Log
/// </summary> /// </summary>
public class WindowsEvtInputListener : InputListener public class WindowsEvtInputListener : InputListener
{ {
private int _pollingIntervalInSeconds = 1; private int _pollingIntervalInSeconds = 1;
private TimberWinR.Parser.WindowsEvent _arguments; private TimberWinR.Parser.WindowsEvent _arguments;
private long _receivedMessages;
public WindowsEvtInputListener(TimberWinR.Parser.WindowsEvent arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 1) public WindowsEvtInputListener(TimberWinR.Parser.WindowsEvent arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 1)
: base(cancelToken, "Win32-Eventlog") : base(cancelToken, "Win32-Eventlog")
{ {
_arguments = arguments; _arguments = arguments;
_pollingIntervalInSeconds = pollingIntervalInSeconds; _pollingIntervalInSeconds = pollingIntervalInSeconds;
var task = new Task(EventWatcher, cancelToken); var task = new Task(EventWatcher, cancelToken);
@@ -36,7 +37,26 @@ namespace TimberWinR.Inputs
public override void Shutdown() public override void Shutdown()
{ {
base.Shutdown(); base.Shutdown();
}
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("windows_events",
new JObject(
new JProperty("messages", _receivedMessages),
new JProperty("binaryFormat", _arguments.BinaryFormat.ToString()),
new JProperty("direction", _arguments.Direction.ToString()),
new JProperty("formatMsg", _arguments.FormatMsg),
new JProperty("fullEventCode", _arguments.FullEventCode),
new JProperty("fullText", _arguments.FullText),
new JProperty("msgErrorMode", _arguments.MsgErrorMode.ToString()),
new JProperty("stringsSep", _arguments.StringsSep),
new JProperty("resolveSIDs", _arguments.ResolveSIDS),
new JProperty("iCheckpoint", CheckpointFileName),
new JProperty("source", _arguments.Source))));
return json;
} }
private void EventWatcher() private void EventWatcher()
@@ -53,12 +73,12 @@ namespace TimberWinR.Inputs
formatMsg = _arguments.FormatMsg, formatMsg = _arguments.FormatMsg,
fullEventCode = _arguments.FullEventCode, fullEventCode = _arguments.FullEventCode,
fullText = _arguments.FullText, fullText = _arguments.FullText,
msgErrorMode = _arguments.MsgErrorMode.ToString(), msgErrorMode = _arguments.MsgErrorMode.ToString(),
stringsSep = _arguments.StringsSep, stringsSep = _arguments.StringsSep,
resolveSIDs = _arguments.ResolveSIDS, resolveSIDs = _arguments.ResolveSIDS,
iCheckpoint = CheckpointFileName, iCheckpoint = CheckpointFileName,
}; };
// Create the query // Create the query
var query = string.Format("SELECT * FROM {0}", _arguments.Source); var query = string.Format("SELECT * FROM {0}", _arguments.Source);
@@ -69,7 +89,7 @@ namespace TimberWinR.Inputs
{ {
try try
{ {
var rs = oLogQuery.Execute(query, iFmt); var rs = oLogQuery.Execute(query, iFmt);
// Browse the recordset // Browse the recordset
for (; !rs.atEnd(); rs.moveNext()) for (; !rs.atEnd(); rs.moveNext())
{ {
@@ -82,11 +102,12 @@ namespace TimberWinR.Inputs
{ {
object v = record.getValue(field.Name); object v = record.getValue(field.Name);
if (field.Name == "Data") if (field.Name == "Data")
v = ToPrintable(v.ToString()); v = ToPrintable(v.ToString());
json.Add(new JProperty(field.Name, v)); json.Add(new JProperty(field.Name, v));
} }
ProcessJson(json); ProcessJson(json);
_receivedMessages++;
} }
} }
// Close the recordset // Close the recordset
@@ -98,11 +119,11 @@ namespace TimberWinR.Inputs
LogManager.GetCurrentClassLogger().Error("WindowsEventListener", ex); LogManager.GetCurrentClassLogger().Error("WindowsEventListener", ex);
firstQuery = true; firstQuery = true;
oLogQuery = new LogQuery(); oLogQuery = new LogQuery();
} }
System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000); System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000);
} }
Finished(); Finished();
} }
} }
} }

View File

@@ -21,7 +21,24 @@ namespace TimberWinR
public Configuration Config { get; set; } public Configuration Config { get; set; }
public List<OutputSender> Outputs { get; set; } public List<OutputSender> Outputs { get; set; }
public List<TcpInputListener> Tcps { get; set; } public List<TcpInputListener> Tcps { get; set; }
public List<InputListener> Listeners { get; set; } public List<InputListener> Listeners { get; set; }
public DateTime StartedOn { get; set; }
public string JsonConfig { get; set; }
public string LogfileDir { get; set; }
public int NumConnections {
get { return numConnections; }
}
public int NumMessages
{
get { return numMessages; }
}
private static int numConnections;
private static int numMessages;
public void Shutdown() public void Shutdown()
{ {
LogManager.GetCurrentClassLogger().Info("Shutting Down"); LogManager.GetCurrentClassLogger().Info("Shutting Down");
@@ -30,8 +47,22 @@ namespace TimberWinR
listener.Shutdown(); listener.Shutdown();
} }
public void IncrementMessageCount(int count = 1)
{
Interlocked.Add(ref numMessages, count);
}
public Manager(string jsonConfigFile, string logLevel, string logfileDir, CancellationToken cancelToken) public Manager(string jsonConfigFile, string logLevel, string logfileDir, CancellationToken cancelToken)
{ {
StartedOn = DateTime.UtcNow;
JsonConfig = jsonConfigFile;
LogfileDir = logfileDir;
numMessages = 0;
numConnections = 0;
Outputs = new List<OutputSender>(); Outputs = new List<OutputSender>();
Listeners = new List<InputListener>(); Listeners = new List<InputListener>();
@@ -59,15 +90,16 @@ namespace TimberWinR
// Is it a directory? // Is it a directory?
if (Directory.Exists(jsonConfigFile)) if (Directory.Exists(jsonConfigFile))
{ {
LogManager.GetCurrentClassLogger().Info("Initialized, Reading Configurations From {0}", jsonConfigFile); DirectoryInfo di = new DirectoryInfo(jsonConfigFile);
LogManager.GetCurrentClassLogger().Info("Initialized, Reading Configurations From {0}", di.FullName);
Config = Configuration.FromDirectory(jsonConfigFile); Config = Configuration.FromDirectory(jsonConfigFile);
} }
else else
{ {
LogManager.GetCurrentClassLogger().Info("Initialized, Reading Configurations From File: {0}", jsonConfigFile);
var fi = new FileInfo(jsonConfigFile); var fi = new FileInfo(jsonConfigFile);
LogManager.GetCurrentClassLogger().Info("Initialized, Reading Configurations From File: {0}", fi.FullName);
if (!fi.Exists) if (!fi.Exists)
throw new FileNotFoundException("Missing config file", jsonConfigFile); throw new FileNotFoundException("Missing config file", jsonConfigFile);

View File

@@ -23,10 +23,16 @@ namespace TimberWinR.Outputs
private readonly int _timeout; private readonly int _timeout;
private readonly object _locker = new object(); private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue; private readonly List<JObject> _jsonQueue;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken) public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken)
: base(cancelToken) : base(cancelToken)
{ {
_sentMessages = 0;
_errorCount = 0;
_protocol = eo.Protocol; _protocol = eo.Protocol;
_timeout = eo.Timeout; _timeout = eo.Timeout;
_manager = manager; _manager = manager;
@@ -36,6 +42,8 @@ namespace TimberWinR.Outputs
_index = eo.Index; _index = eo.Index;
_hostIndex = 0; _hostIndex = 0;
_jsonQueue = new List<JObject>(); _jsonQueue = new List<JObject>();
_numThreads = eo.NumThreads;
for (int i = 0; i < eo.NumThreads; i++) for (int i = 0; i < eo.NumThreads; i++)
{ {
var elsThread = new Task(ElasticsearchSender, cancelToken); var elsThread = new Task(ElasticsearchSender, cancelToken);
@@ -43,6 +51,25 @@ namespace TimberWinR.Outputs
} }
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("elasticsearch",
new JObject(
new JProperty("host", string.Join(",", _host)),
new JProperty("errors", _errorCount),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("hosts",
new JArray(
from h in _host
select new JObject(
new JProperty("host", h)))))));
return json;
}
// //
// Pull off messages from the Queue, batch them up and send them all across // Pull off messages from the Queue, batch them up and send them all across
// //
@@ -55,6 +82,8 @@ namespace TimberWinR.Outputs
{ {
messages = _jsonQueue.Take(1).ToArray(); messages = _jsonQueue.Take(1).ToArray();
_jsonQueue.RemoveRange(0, messages.Length); _jsonQueue.RemoveRange(0, messages.Length);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
} }
if (messages.Length > 0) if (messages.Length > 0)
@@ -96,12 +125,18 @@ namespace TimberWinR.Outputs
{ {
LogManager.GetCurrentClassLogger() LogManager.GetCurrentClassLogger()
.Error("Failed to send: {0}", response.ErrorMessage); .Error("Failed to send: {0}", response.ErrorMessage);
Interlocked.Increment(ref _errorCount);
}
else
{
_sentMessages++;
} }
}); });
} }
catch (Exception error) catch (Exception error)
{ {
LogManager.GetCurrentClassLogger().Error(error); LogManager.GetCurrentClassLogger().Error(error);
Interlocked.Increment(ref _errorCount);
} }
} }
} }
@@ -110,12 +145,14 @@ namespace TimberWinR.Outputs
LogManager.GetCurrentClassLogger() LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Elasticsearch hosts, {0}", .Fatal("Unable to connect with any Elasticsearch hosts, {0}",
String.Join(",", _host)); String.Join(",", _host));
Interlocked.Increment(ref _errorCount);
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
LogManager.GetCurrentClassLogger().Error(ex); LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
} }
} }
} }

View File

@@ -24,6 +24,7 @@ namespace TimberWinR.Outputs
listener.OnMessageRecieved += MessageReceivedHandler; listener.OnMessageRecieved += MessageReceivedHandler;
} }
public abstract JObject ToJson();
protected abstract void MessageReceivedHandler(JObject jsonMessage); protected abstract void MessageReceivedHandler(JObject jsonMessage);
} }
} }

View File

@@ -29,6 +29,11 @@ namespace TimberWinR.Outputs
private TimberWinR.Manager _manager; private TimberWinR.Manager _manager;
private readonly int _batchCount; private readonly int _batchCount;
private readonly int _interval; private readonly int _interval;
private readonly int _numThreads;
private long _sentMessages;
private long _errorCount;
private long _redisDepth;
/// <summary> /// <summary>
/// Get the next client /// Get the next client
@@ -61,9 +66,33 @@ namespace TimberWinR.Outputs
return null; return null;
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("redis",
new JObject(
new JProperty("host", string.Join(",", _redisHosts)),
new JProperty("errors", _errorCount),
new JProperty("redis_depth", _redisDepth),
new JProperty("sent_messages", _sentMessages),
new JProperty("queued_messages", _jsonQueue.Count),
new JProperty("port", _port),
new JProperty("interval", _interval),
new JProperty("threads", _numThreads),
new JProperty("batchcount", _batchCount),
new JProperty("index", _logstashIndexName),
new JProperty("hosts",
new JArray(
from h in _redisHosts
select new JObject(
new JProperty("host", h)))))));
return json;
}
public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken) public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken)
: base(cancelToken) : base(cancelToken)
{ {
_redisDepth = 0;
_batchCount = ro.BatchCount; _batchCount = ro.BatchCount;
_manager = manager; _manager = manager;
_redisHostIndex = 0; _redisHostIndex = 0;
@@ -73,6 +102,8 @@ namespace TimberWinR.Outputs
_timeout = ro.Timeout; _timeout = ro.Timeout;
_logstashIndexName = ro.Index; _logstashIndexName = ro.Index;
_interval = ro.Interval; _interval = ro.Interval;
_numThreads = ro.NumThreads;
_errorCount = 0;
for (int i = 0; i < ro.NumThreads; i++) for (int i = 0; i < ro.NumThreads; i++)
{ {
@@ -81,6 +112,11 @@ namespace TimberWinR.Outputs
} }
} }
public override string ToString()
{
return string.Format("Redis Host: {0} Port: {1}, Threads: {2}, Interval: {3}, BatchCount: {4}", string.Join(",", _redisHosts) , _port, _numThreads, _interval, _batchCount);
}
/// <summary> /// <summary>
/// Forward on Json message to Redis Logstash queue /// Forward on Json message to Redis Logstash queue
/// </summary> /// </summary>
@@ -119,6 +155,8 @@ namespace TimberWinR.Outputs
{ {
messages = _jsonQueue.Take(_batchCount).ToArray(); messages = _jsonQueue.Take(_batchCount).ToArray();
_jsonQueue.RemoveRange(0, messages.Length); _jsonQueue.RemoveRange(0, messages.Length);
if (messages.Length > 0)
_manager.IncrementMessageCount(messages.Length);
} }
if (messages.Length > 0) if (messages.Length > 0)
@@ -141,11 +179,13 @@ namespace TimberWinR.Outputs
{ {
try try
{ {
client.RPush(_logstashIndexName, jsonMessage); _redisDepth = client.RPush(_logstashIndexName, jsonMessage);
_sentMessages++;
} }
catch (SocketException ex) catch (SocketException ex)
{ {
LogManager.GetCurrentClassLogger().Warn(ex); LogManager.GetCurrentClassLogger().Warn(ex);
Interlocked.Increment(ref _errorCount);
} }
} }
client.EndPipe(); client.EndPipe();
@@ -153,6 +193,7 @@ namespace TimberWinR.Outputs
} }
else else
{ {
Interlocked.Increment(ref _errorCount);
LogManager.GetCurrentClassLogger() LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}", .Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts)); String.Join(",", _redisHosts));
@@ -162,6 +203,7 @@ namespace TimberWinR.Outputs
catch (Exception ex) catch (Exception ex)
{ {
LogManager.GetCurrentClassLogger().Error(ex); LogManager.GetCurrentClassLogger().Error(ex);
Interlocked.Increment(ref _errorCount);
} }
} }
} }

View File

@@ -14,10 +14,12 @@ namespace TimberWinR.Outputs
private readonly int _interval; private readonly int _interval;
private readonly object _locker = new object(); private readonly object _locker = new object();
private readonly List<JObject> _jsonQueue; private readonly List<JObject> _jsonQueue;
private long _sentMessages;
public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken) public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken)
: base(cancelToken) : base(cancelToken)
{ {
_sentMessages = 0;
_manager = manager; _manager = manager;
_interval = eo.Interval; _interval = eo.Interval;
_jsonQueue = new List<JObject>(); _jsonQueue = new List<JObject>();
@@ -26,6 +28,16 @@ namespace TimberWinR.Outputs
elsThread.Start(); elsThread.Start();
} }
public override JObject ToJson()
{
JObject json = new JObject(
new JProperty("stdout",
new JObject(
new JProperty("sent_messages", _sentMessages))));
return json;
}
// //
// Pull off messages from the Queue, batch them up and send them all across // Pull off messages from the Queue, batch them up and send them all across
// //
@@ -37,7 +49,7 @@ namespace TimberWinR.Outputs
lock (_locker) lock (_locker)
{ {
messages = _jsonQueue.Take(1).ToArray(); messages = _jsonQueue.Take(1).ToArray();
_jsonQueue.RemoveRange(0, messages.Length); _jsonQueue.RemoveRange(0, messages.Length);
} }
if (messages.Length > 0) if (messages.Length > 0)
@@ -47,6 +59,7 @@ namespace TimberWinR.Outputs
foreach (JObject obj in messages) foreach (JObject obj in messages)
{ {
Console.WriteLine(obj.ToString()); Console.WriteLine(obj.ToString());
_sentMessages++;
} }
} }
catch (Exception ex) catch (Exception ex)

View File

@@ -67,6 +67,7 @@
<ItemGroup> <ItemGroup>
<Compile Include="Configuration.cs" /> <Compile Include="Configuration.cs" />
<Compile Include="ConfigurationErrors.cs" /> <Compile Include="ConfigurationErrors.cs" />
<Compile Include="Diagnostics\Diagnostics.cs" />
<Compile Include="Filters\DateFilter.cs" /> <Compile Include="Filters\DateFilter.cs" />
<Compile Include="Filters\FilterBase.cs" /> <Compile Include="Filters\FilterBase.cs" />
<Compile Include="Filters\GrokFilter.cs" /> <Compile Include="Filters\GrokFilter.cs" />
@@ -121,9 +122,7 @@
<LastGenOutput>Resources.Designer.cs</LastGenOutput> <LastGenOutput>Resources.Designer.cs</LastGenOutput>
</EmbeddedResource> </EmbeddedResource>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup />
<Folder Include="Parsing\" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it. <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets. Other similar extension points exist, see Microsoft.Common.targets.

View File

@@ -5,7 +5,7 @@
<Property Id="CONFIGFILE">default.json</Property> <Property Id="CONFIGFILE">default.json</Property>
<Property Id="LOGDIR">c:\logs</Property> <Property Id="LOGDIR">c:\logs</Property>
<Property Id="LOGLEVEL">Info</Property> <Property Id="LOGLEVEL">Info</Property>
<Property Id="DIAGPORT">5141</Property>
<!-- <!--
We need to be able to uninstall a newer version from an older version. We need to be able to uninstall a newer version from an older version.
The default reinstallmode is "omus", of which the 'o' means "reinstall if missing or older" The default reinstallmode is "omus", of which the 'o' means "reinstall if missing or older"
@@ -58,7 +58,7 @@
Directory="INSTALLFOLDER" Directory="INSTALLFOLDER"
Impersonate="yes" Impersonate="yes"
Execute="deferred" Execute="deferred"
ExeCommand='"[INSTALLFOLDER]TimberWinR.ServiceHost.exe" install --autostart -configFile "[CONFIGFILE]" -logDir "[LOGDIR]" -logLevel "[LOGLEVEL]"' ExeCommand='"[INSTALLFOLDER]TimberWinR.ServiceHost.exe" install --autostart -configFile "[CONFIGFILE]" -logDir "[LOGDIR]" -logLevel "[LOGLEVEL]" -diagnosticPort:[DIAGPORT]'
Return='check'> Return='check'>
</CustomAction> </CustomAction>