Added RedisOutput and TcpListener

This commit is contained in:
Eric Fontana
2014-07-16 12:43:17 -04:00
parent 5a53dfa65b
commit 0b489cc927
14 changed files with 2945 additions and 45 deletions

View File

@@ -1,10 +1,17 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using TimberWinR.Outputs;
using TimberWinR.ServiceHost;
using TimberWinR.Inputs;
using Topshelf;
using Topshelf.HostConfigurators;
using Topshelf.ServiceConfigurators;
namespace TimberWinR.ServiceHost
{
@@ -12,17 +19,25 @@ namespace TimberWinR.ServiceHost
{
private static void Main(string[] args)
{
Arguments arguments = new Arguments();
HostFactory.Run(hostConfigurator =>
{
{
string cmdLine = Environment.CommandLine;
hostConfigurator.Service<TimberWinRService>(serviceConfigurator =>
{
serviceConfigurator.ConstructUsing(() => new TimberWinRService());
serviceConfigurator.ConstructUsing(() => new TimberWinRService(arguments));
serviceConfigurator.WhenStarted(myService => myService.Start());
serviceConfigurator.WhenStopped(myService => myService.Stop());
});
hostConfigurator.RunAsLocalSystem();
hostConfigurator.AddCommandLineDefinition("configFile", c => arguments.ConfigFile = c);
hostConfigurator.ApplyCommandLine();
hostConfigurator.RunAsLocalSystem();
hostConfigurator.StartAutomatically();
hostConfigurator.EnableShutdown();
hostConfigurator.SetDisplayName("TimberWinR");
hostConfigurator.SetDescription("TimberWinR using Topshelf");
hostConfigurator.SetServiceName("TimberWinR");
@@ -30,58 +45,60 @@ namespace TimberWinR.ServiceHost
}
}
internal class Arguments
{
public string ConfigFile { get; set; }
public Arguments()
{
ConfigFile = string.Empty;
}
}
internal class TimberWinRService
{
readonly CancellationTokenSource _cancellationTokenSource;
readonly CancellationToken _cancellationToken;
readonly Task _task;
readonly Task _serviceTask;
private readonly TcpInputListener _nlogListener;
public TimberWinRService()
public TimberWinRService(Arguments args)
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
_task = new Task(RunService, _cancellationToken);
_serviceTask = new Task(RunService, _cancellationToken);
_nlogListener = new TcpInputListener(_cancellationToken, 5140);
var outputRedis = new RedisOutput(new string[] { "tstlexiceapp006.vistaprint.svc", "tstlexiceapp007.vistaprint.svc" }, _cancellationToken);
outputRedis.Connect(_nlogListener);
}
public void Start()
{
_task.Start();
_serviceTask.Start();
}
public void Stop()
{
_cancellationTokenSource.Cancel();
_nlogListener.Shutdown();
}
/// <summary>
/// The Main body of the Service Worker Thread
/// </summary>
private void RunService()
{
TimberWinR.Manager manager = new TimberWinR.Manager();
while (!_cancellationTokenSource.IsCancellationRequested)
{
Console.WriteLine("I am working");
Console.WriteLine(" Step 1");
System.Threading.Thread.Sleep(1000);
Console.WriteLine(" Step 2");
System.Threading.Thread.Sleep(1000);
Console.WriteLine(" Step 3");
System.Threading.Thread.Sleep(1000);
Console.WriteLine(" Step 4");
System.Threading.Thread.Sleep(1000);
Console.WriteLine(" Step 5");
System.Threading.Thread.Sleep(1000);
System.Threading.Thread.Sleep(1000);
}
}
}
}

View File

@@ -51,6 +51,12 @@
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\TimberWinR\TimberWinR.csproj">
<Project>{4ef96a08-21db-4178-be44-70dae594632c}</Project>
<Name>TimberWinR</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.

View File

@@ -1,12 +0,0 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TimberWinR.Service
{
public class Manager
{
}
}

View File

@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace TimberWinR.Inputs
{
public abstract class InputListener
{
public CancellationToken CancelToken { get; set; }
public event Action<string> OnMessageRecieved;
public InputListener(CancellationToken token)
{
this.CancelToken = token;
}
protected void ProcessMessage(string message)
{
if (OnMessageRecieved != null)
OnMessageRecieved(message);
}
}
}

View File

@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Net;
using System.Net.Sockets;
using NLog;
namespace TimberWinR.Inputs
{
public class TcpInputListener : InputListener
{
private readonly System.Net.Sockets.TcpListener _tcpListener;
private Thread _listenThread;
const int bufferSize = 16535;
public TcpInputListener(CancellationToken cancelToken, int port = 5140) : base(cancelToken)
{
_tcpListener = new System.Net.Sockets.TcpListener(IPAddress.Any, port);
_listenThread = new Thread(new ThreadStart(ListenForClients));
_listenThread.Start();
}
public void Shutdown()
{
this._tcpListener.Stop();
}
private void ListenForClients()
{
this._tcpListener.Start();
while (!CancelToken.IsCancellationRequested)
{
try
{
//blocks until a client has connected to the server
TcpClient client = this._tcpListener.AcceptTcpClient();
// Wait for a client, spin up a thread.
var clientThread = new Thread(new ParameterizedThreadStart(HandleNewClient));
clientThread.Start(client);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.Interrupted)
break;
else
LogManager.GetCurrentClassLogger().Error(ex);
}
}
}
private void HandleNewClient(object client)
{
var tcpClient = (TcpClient)client;
NetworkStream clientStream = tcpClient.GetStream();
var message = new byte[bufferSize];
while (!CancelToken.IsCancellationRequested)
{
var bytesRead = 0;
try
{
//blocks until a client sends a message
bytesRead = clientStream.Read(message, 0, bufferSize);
}
catch
{
//a socket error has occured
break;
}
if (bytesRead == 0)
{
//the client has disconnected from the server
break;
}
//message has successfully been received
var encoder = new ASCIIEncoding();
var encodedMessage = encoder.GetString(message, 0, bytesRead);
ProcessMessage(encodedMessage);
}
tcpClient.Close();
}
}
}

View File

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using LogQuery = Interop.MSUtil.LogQueryClassClass;
using TextLineInputFormat = Interop.MSUtil.COMTextLineInputContextClass;
using LogRecordSet = Interop.MSUtil.ILogRecordset;
namespace TimberWinR.Inputs
{
/// <summary>
/// Tail a file.
/// </summary>
class TextLine
{
}
}

64
TimberWinR/Manager.cs Normal file
View File

@@ -0,0 +1,64 @@
using System.IO;
using NLog;
using NLog.Config;
using NLog.Targets;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace TimberWinR
{
/// <summary>
/// The Manager class for TimberWinR
/// </summary>
public class Manager
{
Configuration Config { get; set; }
public Manager(string configurationFile=null)
{
// Read the Configuration file
Config = new Configuration();
var loggingConfiguration = new LoggingConfiguration();
// Create our default targets
var coloredConsoleTarget = new ColoredConsoleTarget();
Target fileTarget = CreateDefaultFileTarget("c:\\logs");
loggingConfiguration.AddTarget("Console", coloredConsoleTarget);
loggingConfiguration.AddTarget("DailyFile", fileTarget);
loggingConfiguration.LoggingRules.Add(new LoggingRule("*", LogLevel.Trace, coloredConsoleTarget));
loggingConfiguration.LoggingRules.Add(new LoggingRule("*", LogLevel.Info, fileTarget));
LogManager.Configuration = loggingConfiguration;
LogManager.EnableLogging();
LogManager.GetCurrentClassLogger().Info("Initialized");
}
/// <summary>
/// Creates the default <see cref="FileTarget"/>.
/// </summary>
/// <param name="logPath"></param>
/// <returns>
/// The NLog file target used in the default logging configuration.
/// </returns>
public static FileTarget CreateDefaultFileTarget(string logPath)
{
return new FileTarget
{
ArchiveEvery = FileArchivePeriod.None,
ArchiveAboveSize = 10 * 1024 * 1024,
MaxArchiveFiles = 5,
BufferSize = 10,
FileName = Path.Combine(logPath, "TimberWinR", "TimberWinR.txt"),
ArchiveFileName = Path.Combine(logPath, "log-{#######}.txt"),
};
}
}
}

View File

@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using TimberWinR.Inputs;
namespace TimberWinR.Outputs
{
public abstract class OutputSender
{
public CancellationToken CancelToken { get; private set; }
private List<InputListener> _inputs;
public OutputSender(CancellationToken cancelToken)
{
CancelToken = cancelToken;
_inputs = new List<InputListener>();
}
public void Connect(InputListener listener)
{
listener.OnMessageRecieved += MessageReceivedHandler;
}
protected abstract void MessageReceivedHandler(string jsonMessage);
}
}

143
TimberWinR/Outputs/Redis.cs Normal file
View File

@@ -0,0 +1,143 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using ctstone.Redis;
using NLog;
using System.Threading.Tasks;
namespace TimberWinR.Outputs
{
public class RedisOutput : OutputSender
{
private readonly string _logstashIndexName;
private readonly string _hostname;
private readonly int _port;
private readonly int _timeout;
private object _locker = new object();
private List<string> _jsonQueue;
readonly Task _consumerTask;
private string[] _redisHosts;
private int _redisHostIndex;
/// <summary>
/// Get the next client
/// </summary>
/// <returns></returns>
private RedisClient getClient()
{
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
int numTries = 0;
while (numTries < _redisHosts.Length)
{
try
{
RedisClient client = new RedisClient(_redisHosts[_redisHostIndex], _port, _timeout);
_redisHostIndex++;
if (_redisHostIndex >= _redisHosts.Length)
_redisHostIndex = 0;
return client;
}
catch (Exception)
{
}
numTries++;
}
return null;
}
public RedisOutput(string[] redisHosts, CancellationToken cancelToken, string logstashIndexName = "logstash", int port = 6379, int timeout = 10000)
: base(cancelToken)
{
_redisHostIndex = 0;
_redisHosts = redisHosts;
_jsonQueue = new List<string>();
_port = port;
_timeout = timeout;
_logstashIndexName = logstashIndexName;
_consumerTask = new Task(RedisSender, CancellationToken.None);
_consumerTask.Start();
}
/// <summary>
/// Forward on Json message to Redis Logstash queue
/// </summary>
/// <param name="jsonMessage"></param>
protected override void MessageReceivedHandler(string jsonMessage)
{
LogManager.GetCurrentClassLogger().Info(jsonMessage);
lock (_locker)
{
_jsonQueue.Add(jsonMessage);
}
}
//
// Pull off messages from the Queue, batch them up and send them all across
//
private void RedisSender()
{
while (!CancelToken.IsCancellationRequested)
{
string[] messages;
lock (_locker)
{
messages = _jsonQueue.ToArray();
_jsonQueue.Clear();
}
if (messages.Length > 0)
{
int numHosts = _redisHosts.Length;
while (numHosts-- > 0)
{
try
{
// Get the next client
using (RedisClient client = getClient())
{
if (client != null)
{
client.StartPipe();
foreach (string jsonMessage in messages)
{
try
{
client.RPush(_logstashIndexName, jsonMessage);
}
catch (SocketException)
{
}
}
client.EndPipe();
break;
}
else
{
LogManager.GetCurrentClassLogger()
.Fatal("Unable to connect with any Redis hosts, {0}",
String.Join(",", _redisHosts));
}
}
}
catch(Exception)
{
// Got an error, try the other hosts
}
}
}
System.Threading.Thread.Sleep(1000);
}
}
}
}

View File

@@ -31,16 +31,16 @@
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="csredis">
<HintPath>..\packages\csredis.1.4.7.1\lib\net40\csredis.dll</HintPath>
</Reference>
<Reference Include="Interop.MSUtil, Version=1.0.0.0, Culture=neutral, processorArchitecture=MSIL">
<SpecificVersion>False</SpecificVersion>
<EmbedInteropTypes>False</EmbedInteropTypes>
<HintPath>lib\com-logparser\Interop.MSUtil.dll</HintPath>
</Reference>
<Reference Include="NLog">
<HintPath>..\packages\NLog.3.1.0.0\lib\net45\NLog.dll</HintPath>
</Reference>
<Reference Include="nunit.framework">
<HintPath>..\packages\NUnit.2.6.3\lib\nunit.framework.dll</HintPath>
<HintPath>..\packages\NLog.3.1.0.0\lib\net40\NLog.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
@@ -55,8 +55,13 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="Class1.cs" />
<Compile Include="Configuration.cs" />
<Compile Include="Inputs\InputListener.cs" />
<Compile Include="Inputs\TcpInputListener.cs" />
<Compile Include="Inputs\TextLine.cs" />
<Compile Include="Manager.cs" />
<Compile Include="Outputs\OutputSender.cs" />
<Compile Include="Outputs\Redis.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>

View File

@@ -1,5 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="csredis" version="1.4.7.1" targetFramework="net40" />
<package id="NLog" version="3.1.0.0" targetFramework="net40" />
<package id="NUnit" version="2.6.3" targetFramework="net40" />
</packages>

Binary file not shown.

File diff suppressed because it is too large Load Diff

Binary file not shown.