What happens with Azure Service Bus when the triggering function is spun down?

We have a regular problem every couple days where some messages we sent to ServiceBus suddenly and rapidly go to the DLQ. We have it configured to try 10 times and suddenly messages across several queues in the same Function Solution (.Net 6) all DLQ with 10 delivery failures very rapidly.

I understand that messages are locked and eventually released if not processed through and I understand how to configure our functions exponential retry policy on the ServiceBus trigger but in these cases what I see appears to be related to “cold start”.

Looking at when the message went to the DLQ in each queue I see that the central resource available memory drops to zero for about 2-3 minutes before picking back up which I believe is an indication of the functions being spun down for inactivity. However, if I stop the functions myself the messages just stay in the active queue. They don’t go straight to the DLQ.

So, if I’m right and this is related to inactivity shutdown and a typical cold start problem… why doesn’t this happen when I stop the functions myself and continue to send it messages?

Is this maybe because the functions had locked the messages right before being shutdown by Azure and while being spun down were in a state where Service Bus was continuing to deliver because it wasn’t yet aware that the functions were down?

If this is the case, what are my options? I know I can control the speed of delivery on the function trigger but if the function is shutting down how can I control the speed at which ServiceBus tried to deliver the message? I experimented with RetryExponential when creating the QueueClient on the send side but that didn’t slow down the delivery attempts or control the number of times it tried.

Edit: Here's a typical example of one that's DLQ'ing: (Names changed. IProcessor should be obvious enough from Processor.cs. The IMyHttpClient class is a class using the HttpClient pattern in startup: services.AddHttpClient(IMyHttpClient, HttpClient);

public class ProcessItem_ServiceBus
{
    private readonly IProcessor _processor;
    private readonly TelemetryClient _telemetry;

    public ProcessItem_ServiceBus(TelemetryConfiguration configuration, IProcessor processor)
    {
        this._telemetry = new TelemetryClient(configuration);
        _processor = processor;
    }

    [FunctionName("ProcessItem_ServiceBus")]
    public async Task Run([ServiceBusTrigger(Constants.ITEM_QUEUE, Connection = "ServiceBusConnectionString")] Message item, ILogger log)
    {
        Item itemToProcess =  null;
        try
        {
            itemToProcess = JsonConvert.DeserializeObject<Item>(Encoding.UTF8.GetString(item.Body), new JsonSerializerSettings() { ReferenceLoopHandling = ReferenceLoopHandling.Ignore });
        }
        catch (Exception ex)
        {
            log.LogError(ex, "Failed to process item");
            _telemetry.TrackEvent(Constants.ITEM_PROCESSING_FAILED);
            throw ex;
        }
        await _processor.ProcessItem(itemToProcess);
    }
}

public class Processor: IProcessor
{
    private readonly TelemetryClient _telemetry;
    private readonly ILogger<Processor> _logger;
    private readonly IMapper _autoMapper;
    private readonly IMyConfig _myConfig;
    private readonly IMyHttpClient _httpClient;
    private readonly IHttpClientConfig _httpClientConfig;

    public ProcessPaymentReceiptHelper(TelemetryConfiguration configuration,
        IMyHttpClient httpClient,
        IMyConfig myConfig, 
        IHttpClientConfig httpClientConfig, 
        IMapper autoMapper, 
        ILogger<Processor> logger
        )
    {
        _telemetry = new TelemetryClient(configuration);
        _httpClient = httClient;
        _myConfig = myConfig;
        _httpClientConfig = httpClientConfig;
        _autoMapper = autoMapper;
        _logger = logger;
    }

    public async Task ProcessItem(Item itemToProcess)
    {
        _telemetry.TrackEvent(Constants.ITEM_RECEIVED);

        var httpClientRequest = new HttpClientRequest
        {
            Source = Constants.APPLICATION_NAME,
            Channel = _httpClientConfig.Channel
        };

        _autoMapper.Map<Item, HttpClientRequest>(item, httpClientRequest);

        var response = await _httpClient.PostItem(httpClientRequest);
        _telemetry.TrackEvent(Constants.ITEM_POSTED);
    }
}

It's a pretty straight forward pattern. Most of the ones failing in this way are all using the HttpClient to post to different API's. Perhaps it's something in there but those are also very straight forward implementations.



Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation