I'm looking for an alternative of stoppingToken.Register(ShutDown) where I'm able to call async methods? Basically, _restClient.Spot.UserStream.StopUserStreamAsync should be awaited. Right now it isn't.
Any other suggestions are appreciated.
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Binance.Net.Objects;
using CryptoExchange.Net.Authentication;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Sockets;
namespace QSGEngine.Server.Services
{
public class PortfolioService : BackgroundService
{
private const string ApiKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
private const string SecretKey = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
private readonly ILogger<PortfolioService> _logger;
private readonly IBinanceClient _restClient;
private readonly IBinanceSocketClient _socketClient;
public PortfolioService(ILogger<PortfolioService> logger)
{
_logger = logger;
_restClient = new BinanceClient(new BinanceClientOptions
{
ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
AutoTimestamp = true,
AutoTimestampRecalculationInterval = TimeSpan.FromMinutes(30),
TradeRulesBehaviour = TradeRulesBehaviour.AutoComply,
#if DEBUG
LogLevel = LogLevel.Debug,
LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
});
_socketClient = new BinanceSocketClient(new BinanceSocketClientOptions
{
ApiCredentials = new ApiCredentials(ApiKey, SecretKey),
AutoReconnect = true,
ReconnectInterval = TimeSpan.FromSeconds(15),
#if DEBUG
LogLevel = LogLevel.Debug,
LogWriters = new List<ILogger> { _logger } // TODO: FIX
#endif
});
}
private string? _listenKey;
private CallResult<UpdateSubscription>? _userDataUpdateSubscription;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(ShutDown);
// Create listen key
var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
if (!listenKeyResult.Success)
{
return;
}
_listenKey = listenKeyResult.Data;
// Subscribe to web socket
_userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
null,
null,
data =>
{
_logger.LogInformation("ASD {Balances}", data.Data.Balances);
},
data =>
{
_logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
}).ConfigureAwait(false);
if (!_userDataUpdateSubscription.Success)
{
return;
}
_userDataUpdateSubscription.Data.Exception = PortfolioService_Exception;
// Keep the listen key alive
using var keepAlive = Task.Run(async () =>
{
while (true)
{
// Listen key will be alive for 60 minutes
await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
}
}, stoppingToken);
}
private void ShutDown()
{
// Unsubscribe from Exception event
if (_userDataUpdateSubscription != null)
{
_userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception;
_userDataUpdateSubscription = null;
}
// Stop listen key
if (_listenKey != null)
{
_restClient.Spot.UserStream.StopUserStreamAsync(_listenKey).GetAwaiter().GetResult();
_listenKey = null;
}
}
private void PortfolioService_Exception(Exception ex)
{
_logger.LogInformation("Exception: {StackTrace}", ex.StackTrace);
}
}
}
CodePudding user response:
- Wrap your shutdown logic in an
IAsyncDisposable. You can either write your own implementation or use an anonymous disposable likeAsyncDisposablefrom myNito.Disposableslibrary. - Use an
await usingdeclaration to asynchronously wait for the disposal.
When the background service is shut down, it will raise an OperationCanceledException. The await using will ensure the disposal asynchronously completes before propagating that exception.
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
// Create listen key
var listenKeyResult = await _restClient.Spot.UserStream.StartUserStreamAsync(stoppingToken);
if (!listenKeyResult.Success)
return;
_listenKey = listenKeyResult.Data;
await using var disposeListenKey = new AsyncDisposable(async () => await _restClient.Spot.UserStream.StopUserStreamAsync(_listenKey));
// Subscribe to web socket
_userDataUpdateSubscription = await _socketClient.Spot.SubscribeToUserDataUpdatesAsync(_listenKey,
null,
null,
data =>
{
_logger.LogInformation("ASD {Balances}", data.Data.Balances);
},
data =>
{
_logger.LogInformation("BALANCE DELTA {BalanceDelta}", data.Data.BalanceDelta);
}).ConfigureAwait(false);
if (!_userDataUpdateSubscription.Success)
return;
_userDataUpdateSubscription.Data.Exception = PortfolioService_Exception;
using var disposeExceptionHandler = new Disposable(() =>
_userDataUpdateSubscription.Data.Exception -= PortfolioService_Exception);
// Keep the listen key alive
using var keepAlive = Task.Run(async () =>
{
while (true)
{
// Listen key will be alive for 60 minutes
await _restClient.Spot.UserStream.KeepAliveUserStreamAsync(_listenKey, stoppingToken).ConfigureAwait(false);
await Task.Delay(TimeSpan.FromMinutes(50), stoppingToken).ConfigureAwait(false);
}
}, stoppingToken);
}
