Added async support"

This commit is contained in:
amiry
2015-04-14 10:35:00 +03:00
parent 4a28888c1f
commit ffd790b4ba
11 changed files with 493 additions and 366 deletions

View File

@@ -1,5 +1,8 @@
# statsd-csharp-client Changelog
## v1.4.0.0
* Added async support
## v1.3.0.0
* Added support for Calendargrams

42
StatsdClient/AsyncLock.cs Normal file
View File

@@ -0,0 +1,42 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace StatsdClient
{
internal sealed class AsyncLock
{
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private readonly Task<IDisposable> _releaser;
public AsyncLock()
{
_releaser = Task.FromResult((IDisposable)new Releaser(this));
}
public Task<IDisposable> LockAsync()
{
var wait = _semaphore.WaitAsync();
return wait.IsCompleted ?
_releaser :
wait.ContinueWith((_, state) => (IDisposable)state,
_releaser.Result, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
private sealed class Releaser : IDisposable
{
private readonly AsyncLock _toRelease;
internal Releaser(AsyncLock toRelease)
{
_toRelease = toRelease;
}
public void Dispose()
{
_toRelease._semaphore.Release();
}
}
}
}

View File

@@ -1,18 +1,15 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace StatsdClient
{
/// <summary>
/// Contract for sending raw statds lines to the server
/// </summary>
public interface IOutputChannel
{
/// <summary>
/// Sends a line of stats data to the server.
/// Contract for sending raw statds lines to the server
/// </summary>
void Send(string line);
}
}
public interface IOutputChannel
{
/// <summary>
/// Sends a line of stats data to the server asynchronously.
/// </summary>
Task SendAsync(string line);
}
}

View File

@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
namespace StatsdClient
{
/// <summary>
@@ -9,45 +10,47 @@ namespace StatsdClient
/// <summary>
/// Log a count for a metric
/// </summary>
void LogCount(string name, int count = 1);
Task LogCountAsync(string name, long count = 1);
/// <summary>
/// Log a gauge value
/// </summary>
void LogGauge(string name, int value);
Task LogGaugeAsync(string name, long value);
/// <summary>
/// Log a latency / Timing
/// </summary>
void LogTiming(string name, int milliseconds);
/// <summary>
/// Log a latency / Timing
/// </summary>
void LogTiming(string name, long milliseconds);
Task LogTimingAsync(string name, long milliseconds);
/// <summary>
/// Log the number of unique occurrances of something
/// </summary>
/// <param name="name"></param>
/// <param name="value"></param>
void LogSet(string name, int value);
Task LogSetAsync(string name, long value);
/// <summary>
/// Log a calendargram metric
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
void LogCalendargram(string name, string value, string period);
Task LogCalendargramAsync(string name, string value, string period);
/// <summary>
/// Log a calendargram metric
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
void LogCalendargram(string name, int value, string period);
Task LogCalendargramAsync(string name, long value, string period);
/// <summary>
/// Log a raw metric that will not get aggregated on the server.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The metric value.</param>
/// <param name="epoch">(optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you.</param>
void LogRaw(string name, int value, long? epoch = null);
Task LogRawAsync(string name, long value, long? epoch = null);
}
}
}

View File

@@ -1,15 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace StatsdClient
{
internal sealed class NullOutputChannel : IOutputChannel
{
public void Send(string line)
internal sealed class NullOutputChannel : IOutputChannel
{
// noop
public async Task SendAsync(string line)
{
}
}
}
}
}

View File

@@ -0,0 +1,10 @@
namespace StatsdClient
{
public static class OutputChannelExtensions
{
public static void Send(this IOutputChannel outputChannel, string line)
{
outputChannel.SendAsync(line).Wait();
}
}
}

View File

@@ -1,226 +1,216 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace StatsdClient
{
/// <summary>
/// The statsd client library.
/// </summary>
public class Statsd : IStatsd
{
private string _prefix;
private IOutputChannel _outputChannel;
/// <summary>
/// Creates a new instance of the Statsd client.
/// The statsd client library.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
public Statsd(string host, int port)
public class Statsd : IStatsd
{
if ( String.IsNullOrEmpty( host ) )
{
Trace.TraceWarning( "Statsd client initialised with empty host address. Dropping back to NullOutputChannel." );
InitialiseInternal( () => new NullOutputChannel(), "", false );
}
else
{
InitialiseInternal( () => new UdpOutputChannel( host, port ), "", false );
}
}
private string _prefix;
private IOutputChannel _outputChannel;
/// <summary>
/// Creates a new instance of the Statsd client.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
/// <param name="prefix">A string prefix to prepend to every metric.</param>
/// <param name="rethrowOnError">If True, rethrows any exceptions caught due to bad configuration.</param>
/// <param name="connectionType">Choose between a UDP (recommended) or TCP connection.</param>
/// <param name="retryOnDisconnect">Retry the connection if it fails (TCP only).</param>
/// <param name="retryAttempts">Number of times to retry before giving up (TCP only).</param>
public Statsd(string host,
int port,
ConnectionType connectionType = ConnectionType.Udp,
string prefix = null,
bool rethrowOnError = false,
bool retryOnDisconnect = true,
int retryAttempts = 3)
{
InitialiseInternal(() =>
/// <summary>
/// Creates a new instance of the Statsd client.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
public Statsd(string host, int port)
{
return connectionType == ConnectionType.Tcp
? (IOutputChannel)new TcpOutputChannel(host, port, retryOnDisconnect, retryAttempts)
: (IOutputChannel)new UdpOutputChannel(host, port);
},
prefix,
rethrowOnError);
}
/// <summary>
/// Creates a new instance of the Statsd client.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
/// <param name="prefix">A string prefix to prepend to every metric.</param>
/// <param name="rethrowOnError">If True, rethrows any exceptions caught due to bad configuration.</param>
/// <param name="outputChannel">Optional output channel (useful for mocking / testing).</param>
public Statsd(string host, int port, string prefix = null, bool rethrowOnError = false, IOutputChannel outputChannel = null)
{
if (outputChannel == null)
{
InitialiseInternal(() => new UdpOutputChannel(host, port), prefix, rethrowOnError);
}
else
{
InitialiseInternal(() => outputChannel, prefix, rethrowOnError);
}
}
private void InitialiseInternal(Func<IOutputChannel> createOutputChannel, string prefix, bool rethrowOnError)
{
_prefix = prefix;
if (_prefix != null && _prefix.EndsWith("."))
{
_prefix = _prefix.Substring(0, _prefix.Length - 1);
}
try
{
_outputChannel = createOutputChannel();
}
catch (Exception ex)
{
if (rethrowOnError)
{
throw;
if (String.IsNullOrEmpty(host))
{
Trace.TraceWarning("Statsd client initialised with empty host address. Dropping back to NullOutputChannel.");
InitialiseInternal(() => new NullOutputChannel(), "", false);
}
else
{
InitialiseInternal(() => new UdpOutputChannel(host, port), "", false);
}
}
Trace.TraceError("Could not initialise the Statsd client: {0} - falling back to NullOutputChannel.", ex.Message);
_outputChannel = new NullOutputChannel();
}
}
/// <summary>
/// Log a counter.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="count">The counter value (defaults to 1).</param>
public void LogCount(string name, int count = 1)
{
SendMetric(MetricType.COUNT, name, _prefix, count);
}
/// <summary>
/// Creates a new instance of the Statsd client.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
/// <param name="prefix">A string prefix to prepend to every metric.</param>
/// <param name="rethrowOnError">If True, rethrows any exceptions caught due to bad configuration.</param>
/// <param name="connectionType">Choose between a UDP (recommended) or TCP connection.</param>
/// <param name="retryOnDisconnect">Retry the connection if it fails (TCP only).</param>
/// <param name="retryAttempts">Number of times to retry before giving up (TCP only).</param>
public Statsd(string host,
int port,
ConnectionType connectionType = ConnectionType.Udp,
string prefix = null,
bool rethrowOnError = false,
bool retryOnDisconnect = true,
int retryAttempts = 3)
{
InitialiseInternal(() =>
{
return connectionType == ConnectionType.Tcp
? (IOutputChannel)new TcpOutputChannel(host, port, retryOnDisconnect, retryAttempts)
: (IOutputChannel)new UdpOutputChannel(host, port);
},
prefix,
rethrowOnError);
}
/// <summary>
/// Log a timing / latency
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="milliseconds">The duration, in milliseconds, for this metric.</param>
public void LogTiming(string name, int milliseconds)
{
SendMetric(MetricType.TIMING, name, _prefix, milliseconds);
}
/// <summary>
/// Creates a new instance of the Statsd client.
/// </summary>
/// <param name="host">The statsd or statsd.net server.</param>
/// <param name="port"></param>
/// <param name="prefix">A string prefix to prepend to every metric.</param>
/// <param name="rethrowOnError">If True, rethrows any exceptions caught due to bad configuration.</param>
/// <param name="outputChannel">Optional output channel (useful for mocking / testing).</param>
public Statsd(string host, int port, string prefix = null, bool rethrowOnError = false, IOutputChannel outputChannel = null)
{
if (outputChannel == null)
{
InitialiseInternal(() => new UdpOutputChannel(host, port), prefix, rethrowOnError);
}
else
{
InitialiseInternal(() => outputChannel, prefix, rethrowOnError);
}
}
/// <summary>
/// Log a timing / latency
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="milliseconds">The duration, in milliseconds, for this metric.</param>
public void LogTiming(string name, long milliseconds)
{
LogTiming(name, (int)milliseconds);
}
private void InitialiseInternal(Func<IOutputChannel> createOutputChannel, string prefix, bool rethrowOnError)
{
_prefix = prefix;
if (_prefix != null && _prefix.EndsWith("."))
{
_prefix = _prefix.Substring(0, _prefix.Length - 1);
}
try
{
_outputChannel = createOutputChannel();
}
catch (Exception ex)
{
if (rethrowOnError)
{
throw;
}
Trace.TraceError("Could not initialise the Statsd client: {0} - falling back to NullOutputChannel.", ex.Message);
_outputChannel = new NullOutputChannel();
}
}
/// <summary>
/// Log a gauge.
/// </summary>
/// <param name="name">The metric name</param>
/// <param name="value">The value for this gauge</param>
public void LogGauge(string name, int value)
{
SendMetric(MetricType.GAUGE, name, _prefix, value);
}
/// <summary>
/// Log a counter.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="count">The counter value (defaults to 1).</param>
public async Task LogCountAsync(string name, long count = 1)
{
await SendMetricAsync(MetricType.COUNT, name, _prefix, count);
}
/// <summary>
/// Log to a set
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The value to log.</param>
/// <remarks>Logging to a set is about counting the number
/// of occurrences of each event.</remarks>
public void LogSet(string name, int value)
{
SendMetric(MetricType.SET, name, _prefix, value);
}
/// <summary>
/// Log a timing / latency
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="milliseconds">The duration, in milliseconds, for this metric.</param>
public async Task LogTimingAsync(string name, long milliseconds)
{
await SendMetricAsync(MetricType.TIMING, name, _prefix, milliseconds);
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public void LogCalendargram(string name, string value, string period)
{
SendMetric(MetricType.CALENDARGRAM, name, _prefix, value, period);
}
/// <summary>
/// Log a gauge.
/// </summary>
/// <param name="name">The metric name</param>
/// <param name="value">The value for this gauge</param>
public async Task LogGaugeAsync(string name, long value)
{
await SendMetricAsync(MetricType.GAUGE, name, _prefix, value);
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public void LogCalendargram(string name, int value, string period)
{
SendMetric(MetricType.CALENDARGRAM, name, _prefix, value, period);
}
/// <summary>
/// Log to a set
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The value to log.</param>
/// <remarks>
/// Logging to a set is about counting the number
/// of occurrences of each event.
/// </remarks>
public async Task LogSetAsync(string name, long value)
{
await SendMetricAsync(MetricType.SET, name, _prefix, value);
}
/// <summary>
/// Log a raw metric that will not get aggregated on the server.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The metric value.</param>
/// <param name="epoch">(optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you.</param>
public void LogRaw(string name, int value, long? epoch = null)
{
SendMetric(MetricType.RAW, name, String.Empty, value, epoch.HasValue ? epoch.ToString() : (string)null);
}
/// <summary>
/// Log a raw metric that will not get aggregated on the server.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The metric value.</param>
/// <param name="epoch">(optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you.</param>
public async Task LogRawAsync(string name, long value, long? epoch = null)
{
await SendMetricAsync(MetricType.RAW, name, String.Empty, value, epoch.HasValue ? epoch.ToString() : null);
}
private void SendMetric(string metricType, string name, string prefix, int value, string postFix = null)
{
if (value < 0)
{
Trace.TraceWarning(String.Format("Metric value for {0} was less than zero: {1}. Not sending.", name, value));
return;
}
SendMetric(metricType, name, prefix, value.ToString(), postFix);
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public async Task LogCalendargramAsync(string name, string value, string period)
{
await SendMetricAsync(MetricType.CALENDARGRAM, name, _prefix, value, period);
}
private void SendMetric(string metricType, string name, string prefix, string value, string postFix = null)
{
if (String.IsNullOrEmpty(name))
{
throw new ArgumentNullException("name");
}
_outputChannel.Send(PrepareMetric(metricType, name, prefix, value, postFix));
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public async Task LogCalendargramAsync(string name, long value, string period)
{
await SendMetricAsync(MetricType.CALENDARGRAM, name, _prefix, value, period);
}
/// <summary>
/// Prepare a metric prior to sending it off ot the Graphite server.
/// </summary>
/// <param name="metricType"></param>
/// <param name="name"></param>
/// <param name="prefix"></param>
/// <param name="value"></param>
/// <param name="postFix">A value to append to the end of the line.</param>
/// <returns>The formatted metric</returns>
protected virtual string PrepareMetric(string metricType, string name, string prefix, string value, string postFix = null)
{
return (String.IsNullOrEmpty(prefix) ? name : (prefix + "." + name))
+ ":" + value
+ "|" + metricType
+ (postFix == null ? String.Empty : "|" + postFix);
private async Task SendMetricAsync(string metricType, string name, string prefix, long value, string postFix = null)
{
if (value < 0)
{
Trace.TraceWarning("Metric value for {0} was less than zero: {1}. Not sending.", name, value);
return;
}
await SendMetricAsync(metricType, name, prefix, value.ToString(), postFix);
}
private async Task SendMetricAsync(string metricType, string name, string prefix, string value, string postFix = null)
{
if (String.IsNullOrEmpty(name))
{
throw new ArgumentNullException("name");
}
await _outputChannel.SendAsync(PrepareMetric(metricType, name, prefix, value, postFix));
}
/// <summary>
/// Prepare a metric prior to sending it off ot the Graphite server.
/// </summary>
/// <param name="metricType"></param>
/// <param name="name"></param>
/// <param name="prefix"></param>
/// <param name="value"></param>
/// <param name="postFix">A value to append to the end of the line.</param>
/// <returns>The formatted metric</returns>
protected virtual string PrepareMetric(string metricType, string name, string prefix, string value, string postFix = null)
{
return (String.IsNullOrEmpty(prefix) ? name : (prefix + "." + name))
+ ":" + value
+ "|" + metricType
+ (postFix == null ? String.Empty : "|" + postFix);
}
}
}
}
}

View File

@@ -40,12 +40,14 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="AsyncLock.cs" />
<Compile Include="CalendargramRetentionPeriod.cs" />
<Compile Include="ConnectionType.cs" />
<Compile Include="IOutputChannel.cs" />
<Compile Include="IStatsd.cs" />
<Compile Include="MetricType.cs" />
<Compile Include="NullOutputChannel.cs" />
<Compile Include="OutputChannelExtensions.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="StatsdExtensions.cs" />
<Compile Include="Statsd.cs" />

View File

@@ -1,35 +1,111 @@
using StatsdClient;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System;
/// <summary>
/// A set of extensions for use with the StatsdClient library.
/// </summary>
public static class StatsdClientExtensions
namespace StatsdClient
{
/// <summary>
/// Log a timing metric
/// </summary>
/// <param name="client">The statsd client instance.</param>
/// <param name="name">The namespace of the timing metric.</param>
/// <param name="duration">The duration to log (will be converted into milliseconds)</param>
public static void LogTiming(this IStatsd client, string name, TimeSpan duration)
{
client.LogTiming(name, (int)duration.TotalMilliseconds);
}
/// <summary>
/// A set of extensions for use with the StatsdClient library.
/// </summary>
public static class StatsdClientExtensions
{
/// <summary>
/// Log a counter.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="count">The counter value (defaults to 1).</param>
public static void LogCount(this IStatsd client, string name, int count = 1)
{
client.LogCountAsync(name, count).Wait();
}
/// <summary>
/// Starts a timing metric that will be logged when the TimingToken is disposed.
/// </summary>
/// <param name="client">The statsd clien instance.</param>
/// <param name="name">The namespace of the timing metric.</param>
/// <returns>A timing token that has been initialised with a start datetime.</returns>
/// <remarks>Wrap the code you want to measure in a using() {} block. The
/// TimingToken instance will log the duration when it is disposed.</remarks>
public static TimingToken LogTiming(this IStatsd client, string name)
{
return new TimingToken(client, name);
}
}
/// <summary>
/// Log a timing / latency
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="milliseconds">The duration, in milliseconds, for this metric.</param>
public static void LogTiming(this IStatsd client, string name, long milliseconds)
{
client.LogTimingAsync(name, milliseconds).Wait();
}
/// <summary>
/// Log a gauge.
/// </summary>
/// <param name="name">The metric name</param>
/// <param name="value">The value for this gauge</param>
public static void LogGauge(this IStatsd client, string name, int value)
{
client.LogGaugeAsync(name, value).Wait();
}
/// <summary>
/// Log to a set
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The value to log.</param>
/// <remarks>
/// Logging to a set is about counting the number of occurrences of each event.
/// </remarks>
public static void LogSet(this IStatsd client, string name, int value)
{
client.LogSetAsync(name, value).Wait();
}
/// <summary>
/// Log a raw metric that will not get aggregated on the server.
/// </summary>
/// <param name="name">The metric name.</param>
/// <param name="value">The metric value.</param>
/// <param name="epoch">(optional) The epoch timestamp. Leave this blank to have the server assign an epoch for you.</param>
public static void LogRaw(this IStatsd client, string name, int value, long? epoch = null)
{
client.LogRawAsync(name, value, epoch).Wait();
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public static void LogCalendargram(this IStatsd client, string name, string value, string period)
{
client.LogCalendargramAsync(name, value, period).Wait();
}
/// <summary>
/// Log a calendargram metric
/// </summary>
/// <param name="name">The metric namespace</param>
/// <param name="value">The unique value to be counted in the time period</param>
/// <param name="period">The time period, can be one of h,d,dow,w,m</param>
public static void LogCalendargram(this IStatsd client, string name, long value, string period)
{
client.LogCalendargramAsync(name, value, period).Wait();
}
/// <summary>
/// Log a timing metric
/// </summary>
/// <param name="client">The statsd client instance.</param>
/// <param name="name">The namespace of the timing metric.</param>
/// <param name="duration">The duration to log (will be converted into milliseconds)</param>
public static void LogTiming(this IStatsd client, string name, TimeSpan duration)
{
client.LogTiming(name, (long)duration.TotalMilliseconds);
}
/// <summary>
/// Starts a timing metric that will be logged when the TimingToken is disposed.
/// </summary>
/// <param name="client">The statsd clien instance.</param>
/// <param name="name">The namespace of the timing metric.</param>
/// <returns>A timing token that has been initialised with a start datetime.</returns>
/// <remarks>
/// Wrap the code you want to measure in a using() {} block. The TimingToken instance will log the duration when it is disposed.
/// </remarks>
public static TimingToken LogTiming(this IStatsd client, string name)
{
return new TimingToken(client, name);
}
}
}

View File

@@ -1,85 +1,87 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace StatsdClient
{
internal sealed class TcpOutputChannel : IOutputChannel
{
private TcpClient _tcpClient;
private NetworkStream _stream;
private object _reconnectLock;
private string _host;
private int _port;
private bool _reconnectEnabled;
private int _retryAttempts;
public TcpOutputChannel(string host, int port, bool reconnectEnabled = true, int retryAttempts = 3)
internal sealed class TcpOutputChannel : IOutputChannel
{
_host = host;
_port = port;
_reconnectEnabled = reconnectEnabled;
_retryAttempts = retryAttempts;
_tcpClient = new TcpClient();
_reconnectLock = new object();
}
private readonly TcpClient _tcpClient;
private NetworkStream _stream;
private readonly string _host;
private readonly int _port;
private readonly bool _reconnectEnabled;
private readonly int _retryAttempts;
private readonly AsyncLock _asyncLock;
public void Send(string line)
{
SendWithRetry(line, _reconnectEnabled ? _retryAttempts - 1 : 0);
}
public TcpOutputChannel(string host, int port, bool reconnectEnabled = true, int retryAttempts = 3)
{
_host = host;
_port = port;
_reconnectEnabled = reconnectEnabled;
_retryAttempts = retryAttempts;
_tcpClient = new TcpClient();
_asyncLock = new AsyncLock();
}
private void SendWithRetry(string line, int attemptsLeft)
{
try
{
if ( !_tcpClient.Connected )
public async Task SendAsync(string line)
{
RestoreConnection();
await SendWithRetryAsync(line, _reconnectEnabled ? _retryAttempts - 1 : 0);
}
var bytesToSend = Encoding.UTF8.GetBytes( line + Environment.NewLine );
_stream.Write( bytesToSend, 0, bytesToSend.Length );
}
catch ( IOException ex )
{
if ( attemptsLeft > 0 )
{
SendWithRetry( line, --attemptsLeft );
}
else
{
// No more attempts left, so log it and continue
Trace.TraceWarning( "Sending metrics via TCP failed with an IOException: {0}", ex.Message );
}
}
catch ( SocketException ex )
{
if ( attemptsLeft > 0 )
{
SendWithRetry( line, --attemptsLeft );
}
else
{
// No more attempts left, so log it and continue
Trace.TraceWarning( "Sending metrics via TCP failed with a SocketException: {0}, code: {1}", ex.Message, ex.SocketErrorCode.ToString() );
}
}
}
private void RestoreConnection()
{
lock (_reconnectLock)
{
if (!_tcpClient.Connected)
private async Task SendWithRetryAsync(string line, int attemptsLeft)
{
_tcpClient.Connect(_host, _port);
_stream = _tcpClient.GetStream();
string errorMessage = null;
try
{
if (!_tcpClient.Connected)
{
await RestoreConnectionAsync();
}
var bytesToSend = Encoding.UTF8.GetBytes(line + Environment.NewLine);
await _stream.WriteAsync(bytesToSend, 0, bytesToSend.Length);
}
catch (IOException ex)
{
errorMessage = string.Format("Sending metrics via TCP failed with an IOException: {0}", ex.Message);
}
catch (SocketException ex)
{
// No more attempts left, so log it and continue
errorMessage = string.Format("Sending metrics via TCP failed with a SocketException: {0}, code: {1}", ex.Message, ex.SocketErrorCode);
}
if (errorMessage != null)
{
if (attemptsLeft > 0)
{
await SendWithRetryAsync(line, --attemptsLeft);
}
else
{
// No more attempts left, so log it and continue
Trace.TraceWarning(errorMessage);
}
}
}
private async Task RestoreConnectionAsync()
{
if (!_tcpClient.Connected)
{
using (await _asyncLock.LockAsync())
{
if (!_tcpClient.Connected)
{
await _tcpClient.ConnectAsync(_host, _port);
_stream = _tcpClient.GetStream();
}
}
}
}
}
}
}
}
}

View File

@@ -1,34 +1,40 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
namespace StatsdClient
{
internal sealed class UdpOutputChannel : IOutputChannel
{
private UdpClient _udpClient;
public Socket ClientSocket { get { return _udpClient.Client; } }
public UdpOutputChannel(string hostOrIPAddress, int port)
internal sealed class UdpOutputChannel : IOutputChannel
{
IPAddress ipAddress;
// Is this an IP address already?
if (!IPAddress.TryParse(hostOrIPAddress, out ipAddress))
{
// Convert to ipv4 address
ipAddress = Dns.GetHostAddresses(hostOrIPAddress).First(p => p.AddressFamily == AddressFamily.InterNetwork);
}
_udpClient = new UdpClient();
_udpClient.Connect(ipAddress, port);
}
private readonly UdpClient _udpClient;
public void Send(string line)
{
byte[] payload = Encoding.UTF8.GetBytes(line);
_udpClient.Send(payload, payload.Length);
public Socket ClientSocket
{
get
{
return _udpClient.Client;
}
}
public UdpOutputChannel(string hostOrIPAddress, int port)
{
IPAddress ipAddress;
// Is this an IP address already?
if (!IPAddress.TryParse(hostOrIPAddress, out ipAddress))
{
// Convert to ipv4 address
ipAddress = Dns.GetHostAddresses(hostOrIPAddress).First(p => p.AddressFamily == AddressFamily.InterNetwork);
}
_udpClient = new UdpClient();
_udpClient.Connect(ipAddress, port);
}
public async Task SendAsync(string line)
{
var payload = Encoding.UTF8.GetBytes(line);
await _udpClient.SendAsync(payload, payload.Length);
}
}
}
}
}