diff --git a/README.md b/README.md index 5e4d9e1..d2a3f4a 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ The current supported Input format sources are: 5. [Stdin](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/StdinInput.md) (Standard Input for Debugging) 6. [W3C](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/W3CInput.md)(Internet Information Services W3C Advanced/Custom Format) 7. [Udp](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/UdpInput.md) (listens for UDP on port for JSON messages) + 8. [TailFiles](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/TailFiles.md) (Tails log files efficiently *New*) ## Codecs The current list of supported codecs are: @@ -69,7 +70,7 @@ A single Json filter using the single tag (this is only provided as a convienien ] ``` -Multiple Json filters must use the jsonFilters and array syntax +Multiple Json filters must use the jsonFilters and array syntax, also mutateFilters, grokFilters, dateFilters, geoipFilters. ```json "Filters": [ { diff --git a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs index ee7828b..86c588b 100644 --- a/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs +++ b/TimberWinR.ServiceHost/Properties/AssemblyInfo.cs @@ -32,5 +32,5 @@ using System.Runtime.InteropServices; // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.3.19.0")] -[assembly: AssemblyFileVersion("1.3.19.0")] +[assembly: AssemblyVersion("1.3.20.0")] +[assembly: AssemblyFileVersion("1.3.20.0")] diff --git a/TimberWinR/Configuration.cs b/TimberWinR/Configuration.cs index d9edb69..8e3503d 100644 --- a/TimberWinR/Configuration.cs +++ b/TimberWinR/Configuration.cs @@ -67,7 +67,13 @@ namespace TimberWinR public IEnumerable Logs { get { return _logs; } - } + } + + private List _tails = new List(); + public IEnumerable TailFiles + { + get { return _tails; } + } private List _iisw3clogs = new List(); @@ -148,6 +154,8 @@ namespace TimberWinR c._stdins.AddRange(x.TimberWinR.Inputs.Stdins.ToList()); if (x.TimberWinR.Inputs.Logs != null) c._logs.AddRange(x.TimberWinR.Inputs.Logs.ToList()); + if (x.TimberWinR.Inputs.TailFiles != null) + c._tails.AddRange(x.TimberWinR.Inputs.TailFiles.ToList()); if (x.TimberWinR.Inputs.Tcps != null) c._tcps.AddRange(x.TimberWinR.Inputs.Tcps.ToList()); if (x.TimberWinR.Inputs.Udps != null) diff --git a/TimberWinR/Inputs/LogsFileDatabase.cs b/TimberWinR/Inputs/LogsFileDatabase.cs index 4b8c94d..2010c4b 100644 --- a/TimberWinR/Inputs/LogsFileDatabase.cs +++ b/TimberWinR/Inputs/LogsFileDatabase.cs @@ -5,10 +5,8 @@ using System.Linq; using System.Text; using Newtonsoft.Json; using NLog; +using TimberWinR.Parser; -using LogQuery = Interop.MSUtil.LogQueryClassClass; -using TextLineInputFormat = Interop.MSUtil.COMTextLineInputContextClass; -using LogRecordSet = Interop.MSUtil.ILogRecordset; namespace TimberWinR.Inputs { @@ -19,7 +17,7 @@ namespace TimberWinR.Inputs { private static readonly object _locker = new object(); private List Entries { get; set; } - private string DatabaseDirectory { get; set; } + private string DatabaseDirectory { get; set; } public string DatabaseFileName { get { return Path.Combine(DatabaseDirectory, ".timberwinrdb"); } @@ -37,6 +35,14 @@ namespace TimberWinR.Inputs } } + private LogsFileDatabaseEntry FindFile(string logName) + { + lock (_locker) + { + var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault(); + return existingEntry; + } + } private bool ExistingFileTest(string logName) { var existingEntry = (from e in Entries where e.FileName == logName select e).FirstOrDefault(); @@ -56,37 +62,51 @@ namespace TimberWinR.Inputs } } - private LogsFileDatabaseEntry AddFileEntry(string logName, TextLineInputFormat fmt) + private LogsFileDatabaseEntry AddFileEntry(string logName) { var de = new LogsFileDatabaseEntry(); lock (_locker) { - var lq = new LogQuery(); + de.NewFile = true; var fi = new FileInfo(logName); de.FileName = logName; de.Size = fi.Length; de.SampleTime = DateTime.UtcNow; - de.CreationTime = fi.CreationTimeUtc; - if (fi.Exists) - { - var qcount = string.Format("SELECT max(Index) as MaxRecordNumber FROM {0}", logName); - var rcount = lq.Execute(qcount, fmt); - var qr = rcount.getRecord(); - var lrn = (Int64)qr.getValueEx("MaxRecordNumber"); - de.MaxRecords = lrn; - } + de.CreationTimeUtc = fi.CreationTimeUtc; Entries.Add(de); WriteDatabaseFileNoLock(); } return de; } - public static LogsFileDatabaseEntry AddLogFile(string logName, TextLineInputFormat fmt) + public static LogsFileDatabaseEntry LookupLogFile(string logName) { - Instance.RemoveFileEntry(logName); // Remove if already exists, otherwise ignores. - return Instance.AddFileEntry(logName, fmt); + LogsFileDatabaseEntry dbe = Instance.FindFile(logName); + if (dbe == null) + dbe = Instance.AddFileEntry(logName); + else + dbe.NewFile = false; + + return dbe; } + public static void Update(LogsFileDatabaseEntry dbe) + { + Instance.UpdateEntry(dbe); + } + + private void UpdateEntry(LogsFileDatabaseEntry dbe) + { + lock(_locker) + { + var fi = new FileInfo(dbe.FileName); + dbe.CreationTimeUtc = fi.CreationTimeUtc; + dbe.SampleTime = DateTime.UtcNow; + dbe.Size = fi.Length; + WriteDatabaseFileNoLock(); + } + + } public static LogsFileDatabase Instance { get @@ -122,6 +142,17 @@ namespace TimberWinR.Inputs { LogManager.GetCurrentClassLogger() .Error("Error reading database '{0}': {1}", DatabaseFileName, ex.ToString()); + try + { + if (File.Exists(DatabaseFileName)) + File.Delete(DatabaseFileName); + LogManager.GetCurrentClassLogger().Info("Creating New Database '{0}'", DatabaseFileName); + WriteDatabaseLock(); + } + catch (Exception ex2) + { + LogManager.GetCurrentClassLogger().Info("Error Creating New Database '{0}': {1}", DatabaseFileName, ex2.ToString()); + } } } private void WriteDatabaseFileNoLock() @@ -155,7 +186,7 @@ namespace TimberWinR.Inputs } private LogsFileDatabase(string databaseDirectory) - { + { DatabaseDirectory = databaseDirectory; Entries = new List(); } @@ -164,11 +195,13 @@ namespace TimberWinR.Inputs public class LogsFileDatabaseEntry { + [JsonIgnore] + public bool NewFile { get; set; } public string FileName { get; set; } public Int64 MaxRecords { get; set; } - public DateTime CreationTime { get; set; } + public DateTime CreationTimeUtc { get; set; } public DateTime SampleTime { get; set; } - public long Size { get; set; } + public long Size { get; set; } } } diff --git a/TimberWinR/Inputs/LogsListener.cs b/TimberWinR/Inputs/LogsListener.cs index 8459b21..aae02de 100644 --- a/TimberWinR/Inputs/LogsListener.cs +++ b/TimberWinR/Inputs/LogsListener.cs @@ -198,6 +198,8 @@ namespace TimberWinR.Inputs recurse = _arguments.Recurse }; + Dictionary _fnfmap = new Dictionary(); + using (var syncHandle = new ManualResetEventSlim()) { // Execute the query @@ -322,7 +324,12 @@ namespace TimberWinR.Inputs } catch (FileNotFoundException fnfex) { - LogManager.GetCurrentClassLogger().Warn(fnfex.Message); + string fn = fnfex.FileName; + + if (!_fnfmap.ContainsKey(fn)) + LogManager.GetCurrentClassLogger().Warn(fnfex.Message); + else + _fnfmap[fn] = fn; } catch (OperationCanceledException oce) { diff --git a/TimberWinR/Inputs/TailFileListener.cs b/TimberWinR/Inputs/TailFileListener.cs new file mode 100644 index 0000000..13af05f --- /dev/null +++ b/TimberWinR/Inputs/TailFileListener.cs @@ -0,0 +1,320 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Net.Configuration; +using System.Runtime.InteropServices; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; +using System.IO; +using Interop.MSUtil; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Newtonsoft.Json.Serialization; + +using NLog; +using NLog.LayoutRenderers; +using TimberWinR.Parser; + +namespace TimberWinR.Inputs +{ + /// + /// Tail a file. + /// + public class TailFileListener : InputListener + { + private int _pollingIntervalInSeconds; + private TimberWinR.Parser.TailFile _arguments; + private long _receivedMessages; + private Dictionary _logFileMaxRecords; + private Dictionary _logFileCreationTimes; + private Dictionary _logFileSampleTimes; + private Dictionary _logFileSizes; + private Codec _codec; + private List _multiline { get; set; } + + public bool Stop { get; set; } + + public TailFileListener(TimberWinR.Parser.TailFile arguments, CancellationToken cancelToken) + : base(cancelToken, "Win32-TailLog") + { + Stop = false; + + _codec = arguments.Codec; + _logFileMaxRecords = new Dictionary(); + _logFileCreationTimes = new Dictionary(); + _logFileSampleTimes = new Dictionary(); + _logFileSizes = new Dictionary(); + + _receivedMessages = 0; + _arguments = arguments; + _pollingIntervalInSeconds = arguments.Interval; + + foreach (string srcFile in _arguments.Location.Split(',')) + { + string file = srcFile.Trim(); + Task.Factory.StartNew(() => TailFileWatcher(file)); + } + } + + public override void Shutdown() + { + LogManager.GetCurrentClassLogger().Info("Shutting Down {0}", InputType); + Stop = true; + base.Shutdown(); + } + + public override JObject ToJson() + { + JObject json = new JObject( + new JProperty("log", + new JObject( + new JProperty("messages", _receivedMessages), + new JProperty("type", InputType), + new JProperty("location", _arguments.Location), + new JProperty("logSource", _arguments.LogSource), + new JProperty("recurse", _arguments.Recurse), + + new JProperty("files", + new JArray(from f in _logFileMaxRecords.Keys + select new JValue(f))), + new JProperty("fileSampleTimes", + new JArray(from f in _logFileSampleTimes.Values + select new JValue(f))), + new JProperty("fileSizes", + new JArray(from f in _logFileSizes.Values + select new JValue(f))), + new JProperty("fileIndices", + new JArray(from f in _logFileMaxRecords.Values + select new JValue(f))), + new JProperty("fileCreationDates", + new JArray(from f in _logFileCreationTimes.Values + select new JValue(f))) + ))); + + + if (_codec != null) + { + var cp = new JProperty("codec", + new JArray( + new JObject( + new JProperty("type", _codec.Type.ToString()), + new JProperty("what", _codec.What.ToString()), + new JProperty("negate", _codec.Negate), + new JProperty("multilineTag", _codec.MultilineTag), + new JProperty("pattern", _codec.Pattern)))); + json.Add(cp); + } + + + return json; + } + + // return true to cancel codec + private void applyMultilineCodec(string msg) + { + if (_codec.Re == null) + _codec.Re = new Regex(_codec.Pattern); + + Match match = _codec.Re.Match(msg); + + bool isMatch = (match.Success && !_codec.Negate) || (!match.Success && _codec.Negate); + + switch (_codec.What) + { + case Codec.WhatType.previous: + if (isMatch) + { + if (_multiline == null) + _multiline = new List(); + + _multiline.Add(msg); + } + else // No Match + { + if (_multiline != null) + { + string single = string.Join("\n", _multiline.ToArray()); + _multiline = null; + JObject jo = new JObject(); + jo["message"] = single; + jo.Add("tags", new JArray(_codec.MultilineTag)); + AddDefaultFields(jo); + ProcessJson(jo); + _receivedMessages++; + } + _multiline = new List(); + _multiline.Add(msg); + } + break; + case Codec.WhatType.next: + if (isMatch) + { + if (_multiline == null) + _multiline = new List(); + _multiline.Add(msg); + } + else // No match + { + if (_multiline != null) + { + _multiline.Add(msg); + string single = string.Join("\n", _multiline.ToArray()); + _multiline = null; + JObject jo = new JObject(); + jo["message"] = single; + jo.Add("tags", new JArray(_codec.MultilineTag)); + AddDefaultFields(jo); + ProcessJson(jo); + _receivedMessages++; + } + else + { + JObject jo = new JObject(); + jo["message"] = msg; + AddDefaultFields(jo); + ProcessJson(jo); + _receivedMessages++; + } + } + break; + } + } + + private void TailFileContents(string fileName, long offset) + { + using (StreamReader reader = new StreamReader(new FileStream(fileName, + FileMode.Open, FileAccess.Read, FileShare.ReadWrite))) + { + //start at the end of the file + long lastMaxOffset = offset; + + //if the file size has not changed, idle + if (reader.BaseStream.Length == lastMaxOffset) + return; + + //seek to the last max offset + reader.BaseStream.Seek(lastMaxOffset, SeekOrigin.Begin); + + //read out of the file until the EOF + string line = ""; + long lineOffset = 0; + while ((line = reader.ReadLine()) != null) + { + if (string.IsNullOrEmpty(line)) + continue; + + long index = lastMaxOffset + lineOffset; + string text = line; + string logFileName = fileName; + var json = new JObject(); + + if (json["logSource"] == null) + { + if (string.IsNullOrEmpty(_arguments.LogSource)) + json.Add(new JProperty("logSource", fileName)); + else + json.Add(new JProperty("logSource", _arguments.LogSource)); + } + json["Text"] = line; + json["Index"] = index; + json["LogFileName"] = fileName; + + if (_codec != null && _codec.Type == Codec.CodecType.multiline) + applyMultilineCodec(line); + else + { + ProcessJson(json); + Interlocked.Increment(ref _receivedMessages); + } + lineOffset += line.Length; + // Console.WriteLine("File: {0}:{1}: {2}", fileName, reader.BaseStream.Position, line); + } + //update the last max offset + lastMaxOffset = reader.BaseStream.Position; + } + } + // One thread for each kind of file to watch, i.e. "*.log,*.txt" would be two separate + // threads. + private void TailFileWatcher(string fileToWatch) + { + Dictionary _fnfmap = new Dictionary(); + + using (var syncHandle = new ManualResetEventSlim()) + { + // Execute the query + while (!Stop && !CancelToken.IsCancellationRequested) + { + try + { + if (!CancelToken.IsCancellationRequested) + { + string path = Path.GetDirectoryName(fileToWatch); + string name = Path.GetFileName(fileToWatch); + + // Ok, we have a potential file filter here as 'fileToWatch' could be foo.log or *.log + + SearchOption so = SearchOption.TopDirectoryOnly; + if (_arguments.Recurse == -1) + so = SearchOption.AllDirectories; + + foreach (string fileName in Directory.GetFiles(path, name, so)) + { + var dbe = LogsFileDatabase.LookupLogFile(fileName); + FileInfo fi = new FileInfo(dbe.FileName); + //LogManager.GetCurrentClassLogger().Info("Located File: {0}, New: {1}", dbe.FileName, dbe.NewFile); + long length = fi.Length; + bool logHasRolled = false; + if (fi.Length < dbe.Size || fi.CreationTimeUtc != dbe.CreationTimeUtc) + { + LogManager.GetCurrentClassLogger().Info("Log has Rolled: {0}", dbe.FileName); + logHasRolled = true; + } + bool processWholeFile = logHasRolled || dbe.NewFile; + if (processWholeFile) + { + LogManager.GetCurrentClassLogger().Info("Process Whole File: {0}", dbe.FileName); + TailFileContents(dbe.FileName, 0); + } + else + { + TailFileContents(dbe.FileName, dbe.Size); + } + LogsFileDatabase.Update(dbe); + } + + // LogManager.GetCurrentClassLogger().Info("Finished Scan...Sleeping"); + if (!Stop) + syncHandle.Wait(TimeSpan.FromSeconds(_pollingIntervalInSeconds), CancelToken); + } + } + catch (FileNotFoundException fnfex) + { + string fn = fnfex.FileName; + + if (!_fnfmap.ContainsKey(fn)) + LogManager.GetCurrentClassLogger().Warn(fnfex.Message); + else + _fnfmap[fn] = fn; + } + catch (OperationCanceledException oce) + { + break; + } + catch (Exception ex) + { + LogManager.GetCurrentClassLogger().Error(ex); + } + finally + { + + } + } + } + Finished(); + } + } +} + diff --git a/TimberWinR/Inputs/UdpInputListener.cs b/TimberWinR/Inputs/UdpInputListener.cs index 76c72b0..3ab6087 100644 --- a/TimberWinR/Inputs/UdpInputListener.cs +++ b/TimberWinR/Inputs/UdpInputListener.cs @@ -21,6 +21,7 @@ namespace TimberWinR.Inputs private readonly int _port; private long _receivedMessages; + private long _parsedErrors; private struct listenProfile { @@ -34,6 +35,7 @@ namespace TimberWinR.Inputs new JProperty("udp", new JObject( new JProperty("port", _port), + new JProperty("errors", _parsedErrors), new JProperty("messages", _receivedMessages) ))); @@ -71,22 +73,25 @@ namespace TimberWinR.Inputs private void StartListener(object useProfile) { var profile = (listenProfile)useProfile; - + string lastMessage = ""; try { while (!CancelToken.IsCancellationRequested) { - byte[] bytes = profile.client.Receive(ref profile.endPoint); try { - var data = Encoding.ASCII.GetString(bytes, 0, bytes.Length); + byte[] bytes = profile.client.Receive(ref profile.endPoint); + var data = Encoding.UTF8.GetString(bytes, 0, bytes.Length); + lastMessage = data; JObject json = JObject.Parse(data); ProcessJson(json); _receivedMessages++; } catch (Exception ex1) { + LogManager.GetCurrentClassLogger().Warn("Bad JSON: {0}", lastMessage); LogManager.GetCurrentClassLogger().Warn(ex1); + _parsedErrors++; } } _udpListener.Close(); diff --git a/TimberWinR/Manager.cs b/TimberWinR/Manager.cs index 9ee6fdf..c3dbce2 100644 --- a/TimberWinR/Manager.cs +++ b/TimberWinR/Manager.cs @@ -200,6 +200,14 @@ namespace TimberWinR output.Connect(elistner); } + foreach (var logConfig in Config.TailFiles) + { + var elistner = new TailFileListener(logConfig, cancelToken); + Listeners.Add(elistner); + foreach (var output in Outputs) + output.Connect(elistner); + } + foreach (var tcp in Config.Tcps) { var elistner = new TcpInputListener(cancelToken, tcp.Port); diff --git a/TimberWinR/Parser.cs b/TimberWinR/Parser.cs index c067623..a7a2199 100644 --- a/TimberWinR/Parser.cs +++ b/TimberWinR/Parser.cs @@ -297,6 +297,36 @@ namespace TimberWinR.Parser } } + public class TailFile : IValidateSchema + { + [JsonProperty(PropertyName = "location")] + public string Location { get; set; } + [JsonProperty(PropertyName = "recurse")] + public int Recurse { get; set; } + [JsonProperty(PropertyName = "fields")] + public List Fields { get; set; } + [JsonProperty(PropertyName = "interval")] + public int Interval { get; set; } + [JsonProperty(PropertyName = "logSource")] + public string LogSource { get; set; } + [JsonProperty(PropertyName = "codec")] + public Codec Codec { get; set; } + + public TailFile() + { + Fields = new List(); + Fields.Add(new Field("LogFilename", "string")); + Fields.Add(new Field("Index", "integer")); + Fields.Add(new Field("Text", "string")); + Interval = 5; + } + + public void Validate() + { + + } + } + public class Log : IValidateSchema { [JsonProperty(PropertyName = "location")] @@ -600,6 +630,9 @@ namespace TimberWinR.Parser [JsonProperty("Logs")] public Log[] Logs { get; set; } + [JsonProperty("TailFiles")] + public TailFile[] TailFiles { get; set; } + [JsonProperty("Tcp")] public Tcp[] Tcps { get; set; } diff --git a/TimberWinR/ReleaseNotes.md b/TimberWinR/ReleaseNotes.md index 2678ece..4d2c632 100644 --- a/TimberWinR/ReleaseNotes.md +++ b/TimberWinR/ReleaseNotes.md @@ -3,6 +3,10 @@ A Native Windows to Redis/Elasticsearch Logstash Agent which runs as a service. Version History +### 1.3.20.0 - 01/29/2015 +1. Added new TailFiles input type which uses a native implementation (more-efficient) than using LogParser's Log +2. Updated Udp input listner to use UTF8 Encoding rather than ASCII +3. Reduced noisy complaint about missing log files for Logs listener ### 1.3.19.0 - 01/12/2015 diff --git a/TimberWinR/TimberWinR.csproj b/TimberWinR/TimberWinR.csproj index 16328c6..d78ca9a 100644 --- a/TimberWinR/TimberWinR.csproj +++ b/TimberWinR/TimberWinR.csproj @@ -88,6 +88,7 @@ + @@ -127,6 +128,7 @@ + diff --git a/TimberWinR/mdocs/TailFiles.md b/TimberWinR/mdocs/TailFiles.md new file mode 100644 index 0000000..951a890 --- /dev/null +++ b/TimberWinR/mdocs/TailFiles.md @@ -0,0 +1,40 @@ +# Input: TailFiles + +The TailFiles input will monitor a log (text) file similar to how a Linux "tail -f" command works. This uses +a native implementation rather than uses LogParser + +## Parameters +The following parameters are allowed when configuring WindowsEvents. + +| Parameter | Type | Description | Details | Default | +| :---------------- |:---------------| :----------------------------------------------------------------------- | :--------------------------- | :-- | +| *location* | string |Location of file(s) to monitor | Path to text file(s) including wildcards. | | +| *logSource* | string |Source name | Used for conditions | | +| *recurse* | integer |Max subdirectory recursion level. | 0 disables subdirectory recursion; -1 enables unlimited recursion. | 0 | +| [codec](https://github.com/Cimpress-MCP/TimberWinR/blob/master/TimberWinR/mdocs/Codec.md) | object | Codec to use | + +Example Input: Monitors all files (recursively) located at C:\Logs1\ matching *.log as a pattern. I.e. C:\Logs1\foo.log, C:\Logs1\Subdir\Log2.log, etc. + +```json +{ + "TimberWinR": { + "Inputs": { + "TailFiles": [ + { + "logSource": "log files", + "location": "C:\\Logs1\\*.log", + "recurse": -1 + } + ] + } + } +} +``` +## Fields +After a successful parse of an event, the following fields are added: + +| Name | Type | Description | +| ---- |:-----| :-----------| +| LogFilename | STRING |Full path of the file containing this line | +| Index | INTEGER | Line number | +| Text | STRING | Text line content |