From 0da192167a8d3a40b614d3d85f86db02939fa7cd Mon Sep 17 00:00:00 2001 From: Eric Fontana Date: Fri, 19 Dec 2014 13:29:28 -0500 Subject: [PATCH] Fixed shutdown slowness --- .../Properties/AssemblyInfo.cs | 4 +- TimberWinR/Inputs/IISW3CInputListener.cs | 114 ++++++------ TimberWinR/Inputs/InputListener.cs | 5 +- TimberWinR/Inputs/LogsListener.cs | 9 +- TimberWinR/Inputs/W3CInputListener.cs | 143 ++++++++------- TimberWinR/Inputs/WindowsEvtInputListener.cs | 164 ++++++++---------- TimberWinR/Outputs/Elasticsearch.cs | 157 +++++++++-------- TimberWinR/Outputs/Redis.cs | 102 ++++++----- TimberWinR/Outputs/Stdout.cs | 57 ++++-- 9 files changed, 419 insertions(+), 336 deletions(-) diff --git a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs index 264ddff..e2459c7 100644 --- a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs +++ b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ using System.Runtime.InteropServices; // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.3.16.0")] -[assembly: AssemblyFileVersion("1.3.16.0")] +[assembly: AssemblyVersion("1.3.17.0")] +[assembly: AssemblyFileVersion("1.3.17.0")] diff --git a/TimberWinR/Inputs/IISW3CInputListener.cs b/TimberWinR/Inputs/IISW3CInputListener.cs index deb2cb7..a7ab436 100644 --- a/TimberWinR/Inputs/IISW3CInputListener.cs +++ b/TimberWinR/Inputs/IISW3CInputListener.cs @@ -18,12 +18,12 @@ namespace TimberWinR.Inputs private readonly int _pollingIntervalInSeconds; private readonly Parser.IISW3CLog _arguments; private long _receivedMessages; - - private IisW3CRowReader rowReader; + public bool Stop { get; set; } + private IisW3CRowReader rowReader; public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) : base(cancelToken, "Win32-IISLog") - { + { _arguments = arguments; _receivedMessages = 0; _pollingIntervalInSeconds = pollingIntervalInSeconds; @@ -38,6 +38,7 @@ namespace TimberWinR.Inputs public override void Shutdown() { + Stop = true; LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); base.Shutdown(); } @@ -70,7 +71,7 @@ namespace TimberWinR.Inputs codepage = _arguments.CodePage, consolidateLogs = true, dirTime = _arguments.DirTime, - dQuotes = _arguments.DoubleQuotes, + dQuotes = _arguments.DoubleQuotes, recurse = _arguments.Recurse, useDoubleQuotes = _arguments.DoubleQuotes }; @@ -79,61 +80,74 @@ namespace TimberWinR.Inputs iFmt.minDateMod = _arguments.MinDateMod.Value.ToString("yyyy-MM-dd hh:mm:ss"); Dictionary logFileMaxRecords = new Dictionary(); - - // Execute the query - while (!CancelToken.IsCancellationRequested) + + using (var syncHandle = new ManualResetEventSlim()) { - try + // Execute the query + while (!Stop) { - oLogQuery = new LogQuery(); - - var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location); - var rsfiles = oLogQuery.Execute(qfiles, iFmt); - for (; !rsfiles.atEnd(); rsfiles.moveNext()) + // Execute the query + if (!CancelToken.IsCancellationRequested) { - var record = rsfiles.getRecord(); - string fileName = record.getValue("LogFilename") as string; - if (!logFileMaxRecords.ContainsKey(fileName)) + try { - var qcount = string.Format("SELECT max(LogRow) as MaxRecordNumber FROM {0}", fileName); - var rcount = oLogQuery.Execute(qcount, iFmt); - var qr = rcount.getRecord(); - var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); - logFileMaxRecords[fileName] = lrn; + oLogQuery = new LogQuery(); + + var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location); + var rsfiles = oLogQuery.Execute(qfiles, iFmt); + for (; !rsfiles.atEnd(); rsfiles.moveNext()) + { + var record = rsfiles.getRecord(); + string fileName = record.getValue("LogFilename") as string; + if (!logFileMaxRecords.ContainsKey(fileName)) + { + var qcount = string.Format("SELECT max(LogRow) as MaxRecordNumber FROM {0}", + fileName); + var rcount = oLogQuery.Execute(qcount, iFmt); + var qr = rcount.getRecord(); + var lrn = (Int64) qr.getValueEx("MaxRecordNumber"); + logFileMaxRecords[fileName] = lrn; + } + } + + foreach (string fileName in logFileMaxRecords.Keys.ToList()) + { + var lastRecordNumber = logFileMaxRecords[fileName]; + var query = string.Format("SELECT * FROM '{0}' Where LogRow > {1}", fileName, + lastRecordNumber); + + var rs = oLogQuery.Execute(query, iFmt); + rowReader.ReadColumnMap(rs); + + // Browse the recordset + for (; !rs.atEnd(); rs.moveNext()) + { + var record = rs.getRecord(); + var json = rowReader.ReadToJson(record); + ProcessJson(json); + _receivedMessages++; + var lrn = (Int64) record.getValueEx("LogRow"); + logFileMaxRecords[fileName] = lrn; + record = null; + json = null; + } + // Close the recordset + rs.close(); + GC.Collect(); + } + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken); } - } - - foreach (string fileName in logFileMaxRecords.Keys.ToList()) - { - var lastRecordNumber = logFileMaxRecords[fileName]; - var query = string.Format("SELECT * FROM '{0}' Where LogRow > {1}", fileName, lastRecordNumber); - - var rs = oLogQuery.Execute(query, iFmt); - rowReader.ReadColumnMap(rs); - - // Browse the recordset - for (; !rs.atEnd(); rs.moveNext()) + catch (OperationCanceledException oce) { - var record = rs.getRecord(); - var json = rowReader.ReadToJson(record); - ProcessJson(json); - _receivedMessages++; - var lrn = (Int64)record.getValueEx("LogRow"); - logFileMaxRecords[fileName] = lrn; - record = null; - json = null; + break; } - // Close the recordset - rs.close(); - GC.Collect(); + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().Error(ex); + } } } - catch (Exception ex) - { - LogManager.GetCurrentClassLogger().Error(ex); - } - - System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000); } Finished(); diff --git a/TimberWinR/Inputs/InputListener.cs b/TimberWinR/Inputs/InputListener.cs index ea46dcf..a6d415c 100644 --- a/TimberWinR/Inputs/InputListener.cs +++ b/TimberWinR/Inputs/InputListener.cs @@ -58,14 +58,17 @@ namespace TimberWinR.Inputs public void Finished() { + LogManager.GetCurrentClassLogger().Info("Signaling Event Shutdown {0}", InputType); FinishedEvent.Set(); - LogManager.GetCurrentClassLogger().Info("Finished Shutdown {0}", InputType); + LogManager.GetCurrentClassLogger().Info("Finished signaling Shutdown {0}", InputType); } public virtual void Shutdown() { LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); FinishedEvent.WaitOne(); + + LogManager.GetCurrentClassLogger().Info("Finished Wait For {0}", InputType); try { if (File.Exists(CheckpointFileName)) diff --git a/TimberWinR/Inputs/LogsListener.cs b/TimberWinR/Inputs/LogsListener.cs index 78c515b..98555ce 100644 --- a/TimberWinR/Inputs/LogsListener.cs +++ b/TimberWinR/Inputs/LogsListener.cs @@ -31,14 +31,14 @@ namespace TimberWinR.Inputs private Dictionary _logFileCreationTimes; private Dictionary _logFileSampleTimes; private Dictionary _logFileSizes; - private CancellationToken _cancelToken; + public bool Stop { get; set; } public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken) : base(cancelToken, "Win32-FileLog") { Stop = false; - _cancelToken = cancelToken; + _logFileMaxRecords = new Dictionary(); _logFileCreationTimes = new Dictionary(); _logFileSampleTimes = new Dictionary(); @@ -108,7 +108,7 @@ namespace TimberWinR.Inputs while (!Stop) { var oLogQuery = new LogQuery(); - if (!_cancelToken.IsCancellationRequested) + if (!CancelToken.IsCancellationRequested) { try { @@ -216,7 +216,8 @@ namespace TimberWinR.Inputs GC.Collect(); } // Sleep - syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), _cancelToken); + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken); } catch (FileNotFoundException fnfex) { diff --git a/TimberWinR/Inputs/W3CInputListener.cs b/TimberWinR/Inputs/W3CInputListener.cs index 01af584..3e43f6a 100644 --- a/TimberWinR/Inputs/W3CInputListener.cs +++ b/TimberWinR/Inputs/W3CInputListener.cs @@ -25,6 +25,7 @@ namespace TimberWinR.Inputs private readonly int _pollingIntervalInSeconds; private readonly TimberWinR.Parser.W3CLog _arguments; private long _receivedMessages; + public bool Stop { get; set; } public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) : base(cancelToken, "Win32-W3CLog") @@ -41,6 +42,7 @@ namespace TimberWinR.Inputs public override void Shutdown() { + Stop = true; LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); base.Shutdown(); } @@ -55,7 +57,7 @@ namespace TimberWinR.Inputs new JProperty("codepage", _arguments.CodePage), new JProperty("separator", _arguments.Separator), new JProperty("dQuotes", _arguments.DoubleQuotes), - new JProperty("dtLines", _arguments.DtLines) + new JProperty("dtLines", _arguments.DtLines) ))); return json; } @@ -73,81 +75,94 @@ namespace TimberWinR.Inputs iCodepage = _arguments.CodePage, doubleQuotedStrings = _arguments.DoubleQuotes, detectTypesLines = _arguments.DtLines, - dQuotes = _arguments.DoubleQuotes, - separator = _arguments.Separator + dQuotes = _arguments.DoubleQuotes, + separator = _arguments.Separator }; - + Dictionary logFileMaxRecords = new Dictionary(); - - // Execute the query - while (!CancelToken.IsCancellationRequested) + using (var syncHandle = new ManualResetEventSlim()) { - try + // Execute the query + while (!Stop) { - oLogQuery = new LogQuery(); - - var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location); - var rsfiles = oLogQuery.Execute(qfiles, iFmt); - for (; !rsfiles.atEnd(); rsfiles.moveNext()) + // Execute the query + if (!CancelToken.IsCancellationRequested) { - var record = rsfiles.getRecord(); - string fileName = record.getValue("LogFilename") as string; - if (!logFileMaxRecords.ContainsKey(fileName)) + try { - var qcount = string.Format("SELECT max(RowNumber) as MaxRecordNumber FROM {0}", fileName); - var rcount = oLogQuery.Execute(qcount, iFmt); - var qr = rcount.getRecord(); - var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); - logFileMaxRecords[fileName] = lrn; - } - } + oLogQuery = new LogQuery(); - - foreach (string fileName in logFileMaxRecords.Keys.ToList()) - { - var lastRecordNumber = logFileMaxRecords[fileName]; - var query = string.Format("SELECT * FROM '{0}' Where RowNumber > {1} order by RowNumber", fileName, lastRecordNumber); - var rs = oLogQuery.Execute(query, iFmt); - var colMap = new Dictionary(); - for (int col = 0; col < rs.getColumnCount(); col++) - { - string colName = rs.getColumnName(col); - colMap[colName] = col; - } - - // Browse the recordset - for (; !rs.atEnd(); rs.moveNext()) - { - var record = rs.getRecord(); - var json = new JObject(); - foreach (var field in colMap.Keys) - { - object v = record.getValue(field); - if (field == "date" || field == "time") + var qfiles = string.Format("SELECT Distinct [LogFilename] FROM {0}", location); + var rsfiles = oLogQuery.Execute(qfiles, iFmt); + for (; !rsfiles.atEnd(); rsfiles.moveNext()) + { + var record = rsfiles.getRecord(); + string fileName = record.getValue("LogFilename") as string; + if (!logFileMaxRecords.ContainsKey(fileName)) { - DateTime dt = DateTime.Parse(v.ToString()); - json.Add(new JProperty(field, dt)); - } - else - json.Add(new JProperty(field, v)); + var qcount = string.Format("SELECT max(RowNumber) as MaxRecordNumber FROM {0}", + fileName); + var rcount = oLogQuery.Execute(qcount, iFmt); + var qr = rcount.getRecord(); + var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); + logFileMaxRecords[fileName] = lrn; + } } - ProcessJson(json); - _receivedMessages++; - var lrn = (Int64)record.getValueEx("RowNumber"); - logFileMaxRecords[fileName] = lrn; - record = null; - json = null; + + + foreach (string fileName in logFileMaxRecords.Keys.ToList()) + { + var lastRecordNumber = logFileMaxRecords[fileName]; + var query = string.Format( + "SELECT * FROM '{0}' Where RowNumber > {1} order by RowNumber", fileName, + lastRecordNumber); + var rs = oLogQuery.Execute(query, iFmt); + var colMap = new Dictionary(); + for (int col = 0; col < rs.getColumnCount(); col++) + { + string colName = rs.getColumnName(col); + colMap[colName] = col; + } + + // Browse the recordset + for (; !rs.atEnd(); rs.moveNext()) + { + var record = rs.getRecord(); + var json = new JObject(); + foreach (var field in colMap.Keys) + { + object v = record.getValue(field); + if (field == "date" || field == "time") + { + DateTime dt = DateTime.Parse(v.ToString()); + json.Add(new JProperty(field, dt)); + } + else + json.Add(new JProperty(field, v)); + } + ProcessJson(json); + _receivedMessages++; + var lrn = (Int64)record.getValueEx("RowNumber"); + logFileMaxRecords[fileName] = lrn; + record = null; + json = null; + } + // Close the recordset + rs.close(); + } + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken); + } + catch (OperationCanceledException oce) + { + break; + } + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().Error(ex); } - // Close the recordset - rs.close(); } } - catch (Exception ex) - { - LogManager.GetCurrentClassLogger().Error(ex); - } - - System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000); } Finished(); diff --git a/TimberWinR/Inputs/WindowsEvtInputListener.cs b/TimberWinR/Inputs/WindowsEvtInputListener.cs index 07e3dde..31b584f 100644 --- a/TimberWinR/Inputs/WindowsEvtInputListener.cs +++ b/TimberWinR/Inputs/WindowsEvtInputListener.cs @@ -27,6 +27,7 @@ namespace TimberWinR.Inputs private TimberWinR.Parser.WindowsEvent _arguments; private long _receivedMessages; private List _tasks { get; set; } + public bool Stop { get; set; } public WindowsEvtInputListener(TimberWinR.Parser.WindowsEvent arguments, CancellationToken cancelToken) : base(cancelToken, "Win32-Eventlog") @@ -40,17 +41,14 @@ namespace TimberWinR.Inputs string hive = eventHive.Trim(); var thread = new Thread(new ParameterizedThreadStart(EventWatcher)); _tasks.Add(thread); - thread.Start(eventHive); + thread.Start(eventHive); } } public override void Shutdown() { - LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); - foreach(Thread t in _tasks) - { - t.Abort(); - } + Stop = true; + LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); base.Shutdown(); } @@ -78,9 +76,9 @@ namespace TimberWinR.Inputs { string location = ploc.ToString(); - LogQuery oLogQuery = new LogQuery(); + LogQuery oLogQuery = new LogQuery(); - LogManager.GetCurrentClassLogger().Info("WindowsEvent Input Listener Ready"); + LogManager.GetCurrentClassLogger().Info("WindowsEvent Input Listener Ready"); // Instantiate the Event Log Input Format object var iFmt = new EventLogInputFormat() @@ -94,101 +92,91 @@ namespace TimberWinR.Inputs stringsSep = _arguments.StringsSep, resolveSIDs = _arguments.ResolveSIDS }; - + oLogQuery = null; - Dictionary logFileMaxRecords = new Dictionary(); - - // Execute the query - while (!CancelToken.IsCancellationRequested) + Dictionary logFileMaxRecords = new Dictionary(); + + using (var syncHandle = new ManualResetEventSlim()) { - try + // Execute the query + while (!Stop) { - Thread.CurrentThread.Priority = ThreadPriority.BelowNormal; - - oLogQuery = new LogQuery(); - - var qfiles = string.Format("SELECT Distinct [EventLog] FROM {0}", location); - var rsfiles = oLogQuery.Execute(qfiles, iFmt); - for (; !rsfiles.atEnd(); rsfiles.moveNext()) + // Execute the query + if (!CancelToken.IsCancellationRequested) { - var record = rsfiles.getRecord(); - string logName = record.getValue("EventLog") as string; - if (!logFileMaxRecords.ContainsKey(logName)) - { - var qcount = string.Format("SELECT max(RecordNumber) as MaxRecordNumber FROM {0}", logName); - var rcount = oLogQuery.Execute(qcount, iFmt); - var qr = rcount.getRecord(); - var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); - logFileMaxRecords[logName] = lrn; - } - } + try + { + oLogQuery = new LogQuery(); - - foreach (string fileName in logFileMaxRecords.Keys.ToList()) - { - var lastRecordNumber = logFileMaxRecords[fileName]; - var query = string.Format("SELECT * FROM {0} where RecordNumber > {1}", location, lastRecordNumber); - - var rs = oLogQuery.Execute(query, iFmt); - // Browse the recordset - for (; !rs.atEnd(); rs.moveNext()) - { - - var record = rs.getRecord(); - var json = new JObject(); - foreach (var field in _arguments.Fields) + var qfiles = string.Format("SELECT Distinct [EventLog] FROM {0}", location); + var rsfiles = oLogQuery.Execute(qfiles, iFmt); + for (; !rsfiles.atEnd(); rsfiles.moveNext()) { - object v = record.getValue(field.Name); - if (field.Name == "Data") - v = ToPrintable(v.ToString()); - json.Add(new JProperty(field.Name, v)); + var record = rsfiles.getRecord(); + string logName = record.getValue("EventLog") as string; + if (!logFileMaxRecords.ContainsKey(logName)) + { + var qcount = string.Format("SELECT max(RecordNumber) as MaxRecordNumber FROM {0}", + logName); + var rcount = oLogQuery.Execute(qcount, iFmt); + var qr = rcount.getRecord(); + var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); + logFileMaxRecords[logName] = lrn; + } } - var lrn = (Int64)record.getValueEx("RecordNumber"); - logFileMaxRecords[fileName] = lrn; - record = null; - ProcessJson(json); - _receivedMessages++; - json = null; + foreach (string fileName in logFileMaxRecords.Keys.ToList()) + { + var lastRecordNumber = logFileMaxRecords[fileName]; + var query = string.Format("SELECT * FROM {0} where RecordNumber > {1}", location, + lastRecordNumber); + var rs = oLogQuery.Execute(query, iFmt); + // Browse the recordset + for (; !rs.atEnd(); rs.moveNext()) + { + + var record = rs.getRecord(); + var json = new JObject(); + foreach (var field in _arguments.Fields) + { + object v = record.getValue(field.Name); + if (field.Name == "Data") + v = ToPrintable(v.ToString()); + json.Add(new JProperty(field.Name, v)); + } + + var lrn = (Int64)record.getValueEx("RecordNumber"); + logFileMaxRecords[fileName] = lrn; + + record = null; + ProcessJson(json); + _receivedMessages++; + json = null; + + } + // Close the recordset + rs.close(); + rs = null; + GC.Collect(); + } + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken); + } + catch (OperationCanceledException oce) + { + break; + } + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().Error(ex); } - // Close the recordset - rs.close(); - rs = null; - GC.Collect(); } } - catch (System.Threading.ThreadAbortException tex) - { - Thread.ResetAbort(); - break; - } - catch (Exception ex) - { - LogManager.GetCurrentClassLogger().Error(ex); - } - - try - { - Thread.CurrentThread.Priority = ThreadPriority.Normal; - System.Threading.Thread.Sleep(_pollingIntervalInSeconds * 1000); - } - catch (System.Threading.ThreadAbortException tex) - { - Thread.ResetAbort(); - break; - } - catch (Exception ex) - { - LogManager.GetCurrentClassLogger().Error(ex); - } - - + Finished(); } - - Finished(); } } } diff --git a/TimberWinR/Outputs/Elasticsearch.cs b/TimberWinR/Outputs/Elasticsearch.cs index a336b4e..5e188ee 100644 --- a/TimberWinR/Outputs/Elasticsearch.cs +++ b/TimberWinR/Outputs/Elasticsearch.cs @@ -28,6 +28,7 @@ namespace TimberWinR.Outputs private long _sentMessages; private long _errorCount; private Parser.ElasticsearchOutput eo; + public bool Stop { get; set; } public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken) : base(cancelToken, "Elasticsearch") @@ -46,10 +47,10 @@ namespace TimberWinR.Outputs _jsonQueue = new List(); _numThreads = eo.NumThreads; - for (int i = 0; i < eo.NumThreads; i++) - { - Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); - } + for (int i = 0; i < eo.NumThreads; i++) + { + Task.Factory.StartNew(ElasticsearchSender, cancelToken, TaskCreationOptions.LongRunning, TaskScheduler.Current); + } } public override JObject ToJson() @@ -58,12 +59,12 @@ namespace TimberWinR.Outputs new JProperty("elasticsearch", new JObject( new JProperty("host", string.Join(",", _host)), - new JProperty("errors", _errorCount), + new JProperty("errors", _errorCount), new JProperty("sent_messages", _sentMessages), new JProperty("queued_messages", _jsonQueue.Count), new JProperty("port", _port), new JProperty("interval", _interval), - new JProperty("threads", _numThreads), + new JProperty("threads", _numThreads), new JProperty("hosts", new JArray( from h in _host @@ -76,91 +77,111 @@ namespace TimberWinR.Outputs // private void ElasticsearchSender() { - while (!CancelToken.IsCancellationRequested) + using (var syncHandle = new ManualResetEventSlim()) { - JObject[] messages; - lock (_locker) + // Execute the query + while (!Stop) { - var count = _jsonQueue.Count; - messages = _jsonQueue.Take(count).ToArray(); - _jsonQueue.RemoveRange(0, count); - if (messages.Length > 0) - _manager.IncrementMessageCount(messages.Length); - } - - if (messages.Length > 0) - { - int numHosts = _host.Length; - while (numHosts-- > 0) + if (!CancelToken.IsCancellationRequested) { try { - // Get the next client - RestClient client = getClient(); - if (client != null) - { - LogManager.GetCurrentClassLogger() - .Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl); + JObject[] messages; + lock (_locker) + { + var count = _jsonQueue.Count; + messages = _jsonQueue.Take(count).ToArray(); + _jsonQueue.RemoveRange(0, count); + if (messages.Length > 0) + _manager.IncrementMessageCount(messages.Length); + } - foreach (JObject json in messages) + if (messages.Length > 0) + { + int numHosts = _host.Length; + while (numHosts-- > 0) { - var typeName = this.eo.GetTypeName(json); - var indexName = this.eo.GetIndexName(json); - var req = new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), Method.POST); - - req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody); - - req.RequestFormat = DataFormat.Json; - try { - client.ExecuteAsync(req, response => + // Get the next client + RestClient client = getClient(); + if (client != null) { - if (response.StatusCode != HttpStatusCode.Created) + LogManager.GetCurrentClassLogger() + .Debug("Sending {0} Messages to {1}", messages.Length, client.BaseUrl); + + foreach (JObject json in messages) { - LogManager.GetCurrentClassLogger() - .Error("Failed to send: {0}", response.ErrorMessage); - Interlocked.Increment(ref _errorCount); + var typeName = this.eo.GetTypeName(json); + var indexName = this.eo.GetIndexName(json); + var req = + new RestRequest(string.Format("/{0}/{1}/", indexName, typeName), + Method.POST); + + req.AddParameter("text/json", json.ToString(), ParameterType.RequestBody); + + req.RequestFormat = DataFormat.Json; + + try + { + client.ExecuteAsync(req, response => + { + if (response.StatusCode != HttpStatusCode.Created) + { + LogManager.GetCurrentClassLogger() + .Error("Failed to send: {0}", response.ErrorMessage); + Interlocked.Increment(ref _errorCount); + } + else + { + _sentMessages++; + GC.Collect(); + } + }); + } + catch (Exception error) + { + LogManager.GetCurrentClassLogger().Error(error); + Interlocked.Increment(ref _errorCount); + } } - else - { - _sentMessages++; - GC.Collect(); - } - }); + GC.Collect(); + } + else + { + LogManager.GetCurrentClassLogger() + .Fatal("Unable to connect with any Elasticsearch hosts, {0}", + String.Join(",", _host)); + Interlocked.Increment(ref _errorCount); + } + } - catch (Exception error) + catch (Exception ex) { - LogManager.GetCurrentClassLogger().Error(error); + LogManager.GetCurrentClassLogger().Error(ex); Interlocked.Increment(ref _errorCount); - } + } } - GC.Collect(); } - else - { - LogManager.GetCurrentClassLogger() - .Fatal("Unable to connect with any Elasticsearch hosts, {0}", - String.Join(",", _host)); - Interlocked.Increment(ref _errorCount); - } - + GC.Collect(); + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken); } - catch (Exception ex) + catch (OperationCanceledException oce) { - LogManager.GetCurrentClassLogger().Error(ex); - Interlocked.Increment(ref _errorCount); + break; + } + catch (Exception) + { + throw; } } } - GC.Collect(); - System.Threading.Thread.Sleep(_interval); } } - private RestClient getClient() - { - if (_hostIndex >= _host.Length) + { + if (_hostIndex >= _host.Length) _hostIndex = 0; int numTries = 0; @@ -168,9 +189,9 @@ namespace TimberWinR.Outputs { try { - string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":",""), _host[_hostIndex], _port); + string url = string.Format("{0}://{1}:{2}", _protocol.Replace(":", ""), _host[_hostIndex], _port); var client = new RestClient(url); - client.Timeout = _timeout; + client.Timeout = _timeout; _hostIndex++; if (_hostIndex >= _host.Length) diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index 6ee19aa..e4385f5 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -38,6 +38,8 @@ namespace TimberWinR.Outputs private int _maxQueueSize; private bool _queueOverflowDiscardOldest; + public bool Stop { get; set; } + /// /// Get the next client /// @@ -185,67 +187,87 @@ namespace TimberWinR.Outputs // private void RedisSender() { - while (!CancelToken.IsCancellationRequested) + using (var syncHandle = new ManualResetEventSlim()) { - string[] messages; - lock (_locker) + // Execute the query + while (!Stop) { - messages = _jsonQueue.Take(_batchCount).ToArray(); - _jsonQueue.RemoveRange(0, messages.Length); - if (messages.Length > 0) - _manager.IncrementMessageCount(messages.Length); - } - - if (messages.Length > 0) - { - int numHosts = _redisHosts.Length; - while (numHosts-- > 0) + if (!CancelToken.IsCancellationRequested) { try { - // Get the next client - using (RedisClient client = getClient()) + string[] messages; + lock (_locker) { - if (client != null) - { - client.StartPipe(); - LogManager.GetCurrentClassLogger() - .Debug("Sending {0} Messages to {1}", messages.Length, client.Host); + messages = _jsonQueue.Take(_batchCount).ToArray(); + _jsonQueue.RemoveRange(0, messages.Length); + if (messages.Length > 0) + _manager.IncrementMessageCount(messages.Length); + } + if (messages.Length > 0) + { + int numHosts = _redisHosts.Length; + while (numHosts-- > 0) + { try { - _redisDepth = client.RPush(_logstashIndexName, messages); - _sentMessages += messages.Length; + // Get the next client + using (RedisClient client = getClient()) + { + if (client != null) + { + client.StartPipe(); + LogManager.GetCurrentClassLogger() + .Debug("Sending {0} Messages to {1}", messages.Length, client.Host); + + try + { + _redisDepth = client.RPush(_logstashIndexName, messages); + _sentMessages += messages.Length; + } + catch (SocketException ex) + { + LogManager.GetCurrentClassLogger().Warn(ex); + Interlocked.Increment(ref _errorCount); + } + finally + { + client.EndPipe(); + } + break; + } + else + { + Interlocked.Increment(ref _errorCount); + LogManager.GetCurrentClassLogger() + .Fatal("Unable to connect with any Redis hosts, {0}", + String.Join(",", _redisHosts)); + } + } } - catch (SocketException ex) + catch (Exception ex) { - LogManager.GetCurrentClassLogger().Warn(ex); + LogManager.GetCurrentClassLogger().Error(ex); Interlocked.Increment(ref _errorCount); } - finally - { - client.EndPipe(); - } - break; - } - else - { - Interlocked.Increment(ref _errorCount); - LogManager.GetCurrentClassLogger() - .Fatal("Unable to connect with any Redis hosts, {0}", - String.Join(",", _redisHosts)); } } + GC.Collect(); + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken); + } + catch (OperationCanceledException oce) + { + break; } catch (Exception ex) { - LogManager.GetCurrentClassLogger().Error(ex); - Interlocked.Increment(ref _errorCount); + + throw; } } } - GC.Collect(); - System.Threading.Thread.Sleep(_interval); } } } diff --git a/TimberWinR/Outputs/Stdout.cs b/TimberWinR/Outputs/Stdout.cs index 2fbf352..c0e7dfa 100644 --- a/TimberWinR/Outputs/Stdout.cs +++ b/TimberWinR/Outputs/Stdout.cs @@ -15,6 +15,7 @@ namespace TimberWinR.Outputs private readonly object _locker = new object(); private readonly List _jsonQueue; private long _sentMessages; + public bool Stop { get; set; } public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken) : base(cancelToken, "Stdout") @@ -34,7 +35,7 @@ namespace TimberWinR.Outputs new JProperty("stdout", new JObject( new JProperty("sent_messages", _sentMessages)))); - + return json; } @@ -43,31 +44,49 @@ namespace TimberWinR.Outputs // private void StdoutSender() { - while (!CancelToken.IsCancellationRequested) + using (var syncHandle = new ManualResetEventSlim()) { - JObject[] messages; - lock (_locker) + // Execute the query + while (!Stop) { - messages = _jsonQueue.Take(_jsonQueue.Count).ToArray(); - _jsonQueue.RemoveRange(0, messages.Length); - } - - if (messages.Length > 0) - { - try + if (!CancelToken.IsCancellationRequested) { - foreach (JObject obj in messages) + try + { + JObject[] messages; + lock (_locker) + { + messages = _jsonQueue.Take(_jsonQueue.Count).ToArray(); + _jsonQueue.RemoveRange(0, messages.Length); + } + + if (messages.Length > 0) + { + try + { + foreach (JObject obj in messages) + { + Console.WriteLine(obj.ToString()); + _sentMessages++; + } + } + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().Error(ex); + } + } + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_interval), CancelToken); + } + catch (OperationCanceledException oce) + { + break; + } + catch (Exception) { - Console.WriteLine(obj.ToString()); - _sentMessages++; } } - catch (Exception ex) - { - LogManager.GetCurrentClassLogger().Error(ex); - } } - System.Threading.Thread.Sleep(_interval); } }