From 3208da64885a288ce9a612fa237a7a642c3dba82 Mon Sep 17 00:00:00 2001 From: Eric Fontana Date: Thu, 5 Mar 2015 06:38:58 -0500 Subject: [PATCH] Final changes from PR reviews --- TimberWinR.UnitTests/Configuration.cs | 2 +- .../Parser/ElasticsearchOutputTests.cs | 4 +- TimberWinR.UnitTests/TestDynamicBatchCount.cs | 6 +-- TimberWinR/Configuration.cs | 47 +++++++++--------- TimberWinR/Inputs/IISW3CInputListener.cs | 4 +- TimberWinR/Inputs/LogsListener.cs | 4 +- TimberWinR/Inputs/W3CInputListener.cs | 4 +- TimberWinR/Manager.cs | 4 +- TimberWinR/Outputs/Elasticsearch.cs | 8 ++-- TimberWinR/Outputs/Redis.cs | 46 +++++++++--------- TimberWinR/Outputs/Stdout.cs | 4 +- TimberWinR/Parser.cs | 48 +++++++++---------- 12 files changed, 91 insertions(+), 90 deletions(-) diff --git a/TimberWinR.UnitTests/Configuration.cs b/TimberWinR.UnitTests/Configuration.cs index 6f91f9e..601a927 100644 --- a/TimberWinR.UnitTests/Configuration.cs +++ b/TimberWinR.UnitTests/Configuration.cs @@ -94,7 +94,7 @@ namespace TimberWinR.UnitTests Configuration c = Configuration.FromString(redisJson); - RedisOutput redis = c.RedisOutputs.First() as RedisOutput; + RedisOutputParameters redis = c.RedisOutputs.First() as RedisOutputParameters; Assert.IsTrue(redis.Host.Length >= 1); } diff --git a/TimberWinR.UnitTests/Parser/ElasticsearchOutputTests.cs b/TimberWinR.UnitTests/Parser/ElasticsearchOutputTests.cs index e59914a..4bafebb 100644 --- a/TimberWinR.UnitTests/Parser/ElasticsearchOutputTests.cs +++ b/TimberWinR.UnitTests/Parser/ElasticsearchOutputTests.cs @@ -10,12 +10,12 @@ public class ElasticsearchOutputTests { - private ElasticsearchOutput parser; + private ElasticsearchOutputParameters parser; [SetUp] public void Setup() { - this.parser = new ElasticsearchOutput(); + this.parser = new ElasticsearchOutputParameters(); } [Test] diff --git a/TimberWinR.UnitTests/TestDynamicBatchCount.cs b/TimberWinR.UnitTests/TestDynamicBatchCount.cs index ae449a1..20033b2 100644 --- a/TimberWinR.UnitTests/TestDynamicBatchCount.cs +++ b/TimberWinR.UnitTests/TestDynamicBatchCount.cs @@ -30,7 +30,7 @@ namespace TimberWinR.UnitTests FakeRediServer fr = new FakeRediServer(cancelToken); - var redisParams = new RedisOutput(); + var redisParams = new RedisOutputParameters(); redisParams.BatchCount = 10; redisParams.MaxBatchCount = 40; redisParams.Interval = 100; @@ -68,8 +68,8 @@ namespace TimberWinR.UnitTests System.Diagnostics.Debug.WriteLine(redisOutput.QueueDepth); JObject json = redisOutput.ToJson(); - var mbc = json["redis"]["reachedMaxBatchCount"].Value(); - var sm = json["redis"]["sent_messages"].Value(); + var mbc = json["redis"]["reachedMaxBatchCountTimes"].Value(); + var sm = json["redis"]["sentMessageCount"].Value(); var errs = json["redis"]["errors"].Value(); var cbc = json["redis"]["currentBatchCount"].Value(); diff --git a/TimberWinR/Configuration.cs b/TimberWinR/Configuration.cs index 8b8d8eb..78fb08f 100644 --- a/TimberWinR/Configuration.cs +++ b/TimberWinR/Configuration.cs @@ -22,7 +22,6 @@ using TimberWinR.Filters; using NLog; using TimberWinR.Parser; using Topshelf.Configurators; -using IISW3CLog = TimberWinR.Parser.IISW3CLog; using WindowsEvent = TimberWinR.Parser.WindowsEvent; namespace TimberWinR @@ -40,39 +39,39 @@ namespace TimberWinR get { return _events; } } - private List _redisOutputs = new List(); - public IEnumerable RedisOutputs + private List _redisOutputs = new List(); + public IEnumerable RedisOutputs { get { return _redisOutputs; } } - private List _elasticsearchOutputs = new List(); - public IEnumerable ElasticsearchOutputs + private List _elasticsearchOutputs = new List(); + public IEnumerable ElasticsearchOutputs { get { return _elasticsearchOutputs; } } - private List _stdoutOutputs = new List(); - public IEnumerable StdoutOutputs + private List _stdoutOutputs = new List(); + public IEnumerable StdoutOutputs { get { return _stdoutOutputs; } } - private List _tcps = new List(); - public IEnumerable Tcps + private List _tcps = new List(); + public IEnumerable Tcps { get { return _tcps; } } - private List _udps = new List(); - public IEnumerable Udps + private List _udps = new List(); + public IEnumerable Udps { get { return _udps; } } - private List _logs = new List(); - public IEnumerable Logs + private List _logs = new List(); + public IEnumerable Logs { get { return _logs; } } @@ -83,16 +82,16 @@ namespace TimberWinR get { return _tails; } } - private List _iisw3clogs = new List(); + private List _iisw3clogs = new List(); - public IEnumerable IISW3C + public IEnumerable IISW3C { get { return _iisw3clogs; } } - private List _w3clogs = new List(); + private List _w3clogs = new List(); - public IEnumerable W3C + public IEnumerable W3C { get { return _w3clogs; } } @@ -290,13 +289,13 @@ namespace TimberWinR { _filters = new List(); _events = new List(); - _iisw3clogs = new List(); - _logs = new List(); - _redisOutputs = new List(); - _elasticsearchOutputs = new List(); - _stdoutOutputs = new List(); - _tcps = new List(); - _udps = new List(); + _iisw3clogs = new List(); + _logs = new List(); + _redisOutputs = new List(); + _elasticsearchOutputs = new List(); + _stdoutOutputs = new List(); + _tcps = new List(); + _udps = new List(); } public static Object GetPropValue(String name, Object obj) diff --git a/TimberWinR/Inputs/IISW3CInputListener.cs b/TimberWinR/Inputs/IISW3CInputListener.cs index 5f64765..6c92a19 100644 --- a/TimberWinR/Inputs/IISW3CInputListener.cs +++ b/TimberWinR/Inputs/IISW3CInputListener.cs @@ -16,12 +16,12 @@ namespace TimberWinR.Inputs public class IISW3CInputListener : InputListener { private readonly int _pollingIntervalInSeconds; - private readonly Parser.IISW3CLog _arguments; + private readonly Parser.IISW3CLogParameters _arguments; private long _receivedMessages; public bool Stop { get; set; } private IisW3CRowReader rowReader; - public IISW3CInputListener(Parser.IISW3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) + public IISW3CInputListener(Parser.IISW3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) : base(cancelToken, "Win32-IISLog") { _arguments = arguments; diff --git a/TimberWinR/Inputs/LogsListener.cs b/TimberWinR/Inputs/LogsListener.cs index ee12bec..5a07374 100644 --- a/TimberWinR/Inputs/LogsListener.cs +++ b/TimberWinR/Inputs/LogsListener.cs @@ -28,7 +28,7 @@ namespace TimberWinR.Inputs public class LogsListener : InputListener { private int _pollingIntervalInSeconds; - private TimberWinR.Parser.Log _arguments; + private TimberWinR.Parser.LogParameters _arguments; private long _receivedMessages; private Dictionary _logFileMaxRecords; private Dictionary _logFileCreationTimes; @@ -39,7 +39,7 @@ namespace TimberWinR.Inputs public bool Stop { get; set; } - public LogsListener(TimberWinR.Parser.Log arguments, CancellationToken cancelToken) + public LogsListener(TimberWinR.Parser.LogParameters arguments, CancellationToken cancelToken) : base(cancelToken, "Win32-FileLog") { Stop = false; diff --git a/TimberWinR/Inputs/W3CInputListener.cs b/TimberWinR/Inputs/W3CInputListener.cs index 7b15f56..7e383c5 100644 --- a/TimberWinR/Inputs/W3CInputListener.cs +++ b/TimberWinR/Inputs/W3CInputListener.cs @@ -23,11 +23,11 @@ namespace TimberWinR.Inputs public class W3CInputListener : InputListener { private readonly int _pollingIntervalInSeconds; - private readonly TimberWinR.Parser.W3CLog _arguments; + private readonly TimberWinR.Parser.W3CLogParameters _arguments; private long _receivedMessages; public bool Stop { get; set; } - public W3CInputListener(TimberWinR.Parser.W3CLog arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) + public W3CInputListener(TimberWinR.Parser.W3CLogParameters arguments, CancellationToken cancelToken, int pollingIntervalInSeconds = 5) : base(cancelToken, "Win32-W3CLog") { _arguments = arguments; diff --git a/TimberWinR/Manager.cs b/TimberWinR/Manager.cs index e5192df..d8dd550 100644 --- a/TimberWinR/Manager.cs +++ b/TimberWinR/Manager.cs @@ -179,7 +179,7 @@ namespace TimberWinR } } - foreach (Parser.IISW3CLog iisw3cConfig in config.IISW3C) + foreach (Parser.IISW3CLogParameters iisw3cConfig in config.IISW3C) { var elistner = new IISW3CInputListener(iisw3cConfig, cancelToken); Listeners.Add(elistner); @@ -187,7 +187,7 @@ namespace TimberWinR output.Connect(elistner); } - foreach (Parser.W3CLog iisw3cConfig in config.W3C) + foreach (Parser.W3CLogParameters iisw3cConfig in config.W3C) { var elistner = new W3CInputListener(iisw3cConfig, cancelToken); Listeners.Add(elistner); diff --git a/TimberWinR/Outputs/Elasticsearch.cs b/TimberWinR/Outputs/Elasticsearch.cs index 468564e..062e3f3 100644 --- a/TimberWinR/Outputs/Elasticsearch.cs +++ b/TimberWinR/Outputs/Elasticsearch.cs @@ -27,10 +27,10 @@ namespace TimberWinR.Outputs private readonly int _numThreads; private long _sentMessages; private long _errorCount; - private Parser.ElasticsearchOutput eo; + private Parser.ElasticsearchOutputParameters eo; public bool Stop { get; set; } - public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutput eo, CancellationToken cancelToken) + public ElasticsearchOutput(TimberWinR.Manager manager, Parser.ElasticsearchOutputParameters eo, CancellationToken cancelToken) : base(cancelToken, "Elasticsearch") { _sentMessages = 0; @@ -60,8 +60,8 @@ namespace TimberWinR.Outputs new JObject( new JProperty("host", string.Join(",", _host)), new JProperty("errors", _errorCount), - new JProperty("sent_messages", _sentMessages), - new JProperty("queued_messages", _jsonQueue.Count), + new JProperty("sentMmessageCount", _sentMessages), + new JProperty("queuedMessageCount", _jsonQueue.Count), new JProperty("port", _port), new JProperty("interval", _interval), new JProperty("threads", _numThreads), diff --git a/TimberWinR/Outputs/Redis.cs b/TimberWinR/Outputs/Redis.cs index 2a536b3..e192083 100644 --- a/TimberWinR/Outputs/Redis.cs +++ b/TimberWinR/Outputs/Redis.cs @@ -20,7 +20,8 @@ namespace TimberWinR.Outputs { internal class BatchCounter { - public int ReachedMaxBatchCount { get; set; } + // Total number of times reached max batch count (indicates we are under pressure) + public int ReachedMaxBatchCountTimes { get; set; } private readonly int[] _sampleQueueDepths; private int _sampleCountIndex; @@ -44,7 +45,7 @@ namespace TimberWinR.Outputs _sampleQueueDepths = new int[QUEUE_SAMPLE_SIZE]; _sampleCountIndex = 0; _totalSamples = 0; - ReachedMaxBatchCount = 0; + ReachedMaxBatchCountTimes = 0; } public void SampleQueueDepth(int queueDepth) { @@ -85,7 +86,7 @@ namespace TimberWinR.Outputs { LogManager.GetCurrentClassLogger().Warn("Maximum Batch Count of {0} reached.", currentBatchCount); _warnedReachedMax = true; // Only complain when it's reached (1 time, unless reset) - ReachedMaxBatchCount++; + ReachedMaxBatchCountTimes++; currentBatchCount = _maxBatchCount; } } @@ -175,9 +176,9 @@ namespace TimberWinR.Outputs new JProperty("host", string.Join(",", _redisHosts)), new JProperty("errors", _errorCount), new JProperty("lastErrorTimeUTC", _lastErrorTimeUTC), - new JProperty("redis_depth", _redisDepth), - new JProperty("sent_messages", _sentMessages), - new JProperty("queued_messages", _jsonQueue.Count), + new JProperty("redisQueueDepth", _redisDepth), + new JProperty("sentMessageCount", _sentMessages), + new JProperty("queuedMessageCount", _jsonQueue.Count), new JProperty("port", _port), new JProperty("maxQueueSize", _maxQueueSize), new JProperty("overflowDiscardOldest", _queueOverflowDiscardOldest), @@ -185,7 +186,7 @@ namespace TimberWinR.Outputs new JProperty("threads", _numThreads), new JProperty("batchcount", _batchCount), new JProperty("currentBatchCount", _currentBatchCount), - new JProperty("reachedMaxBatchCount", _batchCounter.ReachedMaxBatchCount), + new JProperty("reachedMaxBatchCountTimes", _batchCounter.ReachedMaxBatchCountTimes), new JProperty("maxBatchCount", _maxBatchCount), new JProperty("averageQueueDepth", _batchCounter.AverageQueueDepth()), new JProperty("queueSamples", new JArray(_batchCounter.Samples())), @@ -198,33 +199,33 @@ namespace TimberWinR.Outputs return json; } - public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutput ro, CancellationToken cancelToken) + public RedisOutput(TimberWinR.Manager manager, Parser.RedisOutputParameters parameters, CancellationToken cancelToken) : base(cancelToken, "Redis") { _redisDepth = 0; - _batchCount = ro.BatchCount; - _maxBatchCount = ro.MaxBatchCount; + _batchCount = parameters.BatchCount; + _maxBatchCount = parameters.MaxBatchCount; // Make sure maxBatchCount is larger than batchCount if (_maxBatchCount < _batchCount) _maxBatchCount = _batchCount*10; _manager = manager; _redisHostIndex = 0; - _redisHosts = ro.Host; + _redisHosts = parameters.Host; _jsonQueue = new List(); - _port = ro.Port; - _timeout = ro.Timeout; - _logstashIndexName = ro.Index; - _interval = ro.Interval; - _numThreads = ro.NumThreads; + _port = parameters.Port; + _timeout = parameters.Timeout; + _logstashIndexName = parameters.Index; + _interval = parameters.Interval; + _numThreads = parameters.NumThreads; _errorCount = 0; _lastErrorTimeUTC = null; - _maxQueueSize = ro.MaxQueueSize; - _queueOverflowDiscardOldest = ro.QueueOverflowDiscardOldest; + _maxQueueSize = parameters.MaxQueueSize; + _queueOverflowDiscardOldest = parameters.QueueOverflowDiscardOldest; _batchCounter = new BatchCounter(_batchCount, _maxBatchCount); _currentBatchCount = _batchCount; - for (int i = 0; i < ro.NumThreads; i++) + for (int i = 0; i < parameters.NumThreads; i++) { var redisThread = new Task(RedisSender, cancelToken); redisThread.Start(); @@ -310,12 +311,13 @@ namespace TimberWinR.Outputs lock (_locker) { _batchCounter.SampleQueueDepth(_jsonQueue.Count); - + // Re-compute current batch size + _currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount); + messages = _jsonQueue.Take(_currentBatchCount).ToArray(); _jsonQueue.RemoveRange(0, messages.Length); - // Re-compute current batch size - _currentBatchCount = _batchCounter.UpdateCurrentBatchCount(_jsonQueue.Count, _currentBatchCount); + } if (messages.Length > 0) diff --git a/TimberWinR/Outputs/Stdout.cs b/TimberWinR/Outputs/Stdout.cs index 410c09a..e21b49a 100644 --- a/TimberWinR/Outputs/Stdout.cs +++ b/TimberWinR/Outputs/Stdout.cs @@ -17,7 +17,7 @@ namespace TimberWinR.Outputs private long _sentMessages; public bool Stop { get; set; } - public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutput eo, CancellationToken cancelToken) + public StdoutOutput(TimberWinR.Manager manager, Parser.StdoutOutputParameters eo, CancellationToken cancelToken) : base(cancelToken, "Stdout") { _sentMessages = 0; @@ -34,7 +34,7 @@ namespace TimberWinR.Outputs JObject json = new JObject( new JProperty("stdout", new JObject( - new JProperty("sent_messages", _sentMessages)))); + new JProperty("sentMessageCount", _sentMessages)))); return json; } diff --git a/TimberWinR/Parser.cs b/TimberWinR/Parser.cs index c163114..34f73b4 100644 --- a/TimberWinR/Parser.cs +++ b/TimberWinR/Parser.cs @@ -327,7 +327,7 @@ namespace TimberWinR.Parser } } - public class Log : IValidateSchema + public class LogParameters : IValidateSchema { [JsonProperty(PropertyName = "location")] public string Location { get; set; } @@ -346,7 +346,7 @@ namespace TimberWinR.Parser [JsonProperty(PropertyName = "codec")] public Codec Codec { get; set; } - public Log() + public LogParameters() { Fields = new List(); Fields.Add(new Field("LogFilename", "string")); @@ -361,12 +361,12 @@ namespace TimberWinR.Parser } } - public class Tcp : IValidateSchema + public class TcpParameters : IValidateSchema { [JsonProperty(PropertyName = "port")] public int Port { get; set; } - public Tcp() + public TcpParameters() { Port = 5140; } @@ -378,12 +378,12 @@ namespace TimberWinR.Parser } - public class Udp : IValidateSchema + public class UdpParameters : IValidateSchema { [JsonProperty(PropertyName = "port")] public int Port { get; set; } - public Udp() + public UdpParameters() { Port = 5142; } @@ -393,7 +393,7 @@ namespace TimberWinR.Parser } } - public class W3CLog : IValidateSchema + public class W3CLogParameters : IValidateSchema { [JsonProperty(PropertyName = "location")] public string Location { get; set; } @@ -410,7 +410,7 @@ namespace TimberWinR.Parser [JsonProperty(PropertyName = "fields")] public List Fields { get; set; } - public W3CLog() + public W3CLogParameters() { CodePage = 0; DtLines = 10; @@ -428,7 +428,7 @@ namespace TimberWinR.Parser } - public class IISW3CLog : IValidateSchema + public class IISW3CLogParameters : IValidateSchema { [JsonProperty(PropertyName = "location")] public string Location { get; set; } @@ -448,7 +448,7 @@ namespace TimberWinR.Parser [JsonProperty(PropertyName = "fields")] public List Fields { get; set; } - public IISW3CLog() + public IISW3CLogParameters() { CodePage = -2; Recurse = 0; @@ -494,7 +494,7 @@ namespace TimberWinR.Parser } } - public class ElasticsearchOutput + public class ElasticsearchOutputParameters { const string IndexDatePattern = "(%\\{(?[^\\}]+)\\})"; @@ -513,7 +513,7 @@ namespace TimberWinR.Parser [JsonProperty(PropertyName = "interval")] public int Interval { get; set; } - public ElasticsearchOutput() + public ElasticsearchOutputParameters() { Protocol = "http"; Port = 9200; @@ -564,7 +564,7 @@ namespace TimberWinR.Parser } - public class RedisOutput + public class RedisOutputParameters { [JsonProperty(PropertyName = "host")] public string[] Host { get; set; } @@ -587,7 +587,7 @@ namespace TimberWinR.Parser [JsonProperty(PropertyName = "queue_overflow_discard_oldest")] public bool QueueOverflowDiscardOldest { get; set; } - public RedisOutput() + public RedisOutputParameters() { Port = 6379; Index = "logstash"; @@ -602,12 +602,12 @@ namespace TimberWinR.Parser } } - public class StdoutOutput + public class StdoutOutputParameters { [JsonProperty(PropertyName = "interval")] public int Interval { get; set; } - public StdoutOutput() + public StdoutOutputParameters() { Interval = 1000; } @@ -616,13 +616,13 @@ namespace TimberWinR.Parser public class OutputTargets { [JsonProperty("Redis")] - public RedisOutput[] Redis { get; set; } + public RedisOutputParameters[] Redis { get; set; } [JsonProperty("Elasticsearch")] - public ElasticsearchOutput[] Elasticsearch { get; set; } + public ElasticsearchOutputParameters[] Elasticsearch { get; set; } [JsonProperty("Stdout")] - public StdoutOutput[] Stdout { get; set; } + public StdoutOutputParameters[] Stdout { get; set; } } public class InputSources @@ -631,22 +631,22 @@ namespace TimberWinR.Parser public WindowsEvent[] WindowsEvents { get; set; } [JsonProperty("Logs")] - public Log[] Logs { get; set; } + public LogParameters[] Logs { get; set; } [JsonProperty("TailFiles")] public TailFile[] TailFiles { get; set; } [JsonProperty("Tcp")] - public Tcp[] Tcps { get; set; } + public TcpParameters[] Tcps { get; set; } [JsonProperty("Udp")] - public Udp[] Udps { get; set; } + public UdpParameters[] Udps { get; set; } [JsonProperty("IISW3CLogs")] - public IISW3CLog[] IISW3CLogs { get; set; } + public IISW3CLogParameters[] IISW3CLogs { get; set; } [JsonProperty("W3CLogs")] - public W3CLog[] W3CLogs { get; set; } + public W3CLogParameters[] W3CLogs { get; set; } [JsonProperty("Stdin")] public Stdin[] Stdins { get; set; }