Home > Mobile >  AWS Checking StateMachines/StepFunctions concurrent runs
AWS Checking StateMachines/StepFunctions concurrent runs

Time:01-24

I am having a lot of issues handling concurrent runs of a StateMachine (Step Function) that does have a GlueJob task in it.

The state machine is initiated by a Lambda that gets trigger by a FIFO SQS queue.

The lambda gets the message, checks how many of state machine instances are running and if this number is below the GlueJob concurrent runs threshold, it starts the State Machine.

The problem I am having is that this check fails most of the time. The state machine starts although there is not enough concurrency available for my GlueJob. Obviously, the message the SQS queue passes to lambda gets processed, so if the state machine fails for this reason, that message is gone forever (unless I catch the exception and send back a new message to the queue).

I believe this behavior is due to the speed messages gets processed by my lambda (although it's a FIFO queue, so 1 message at a time), and the fact that my checker cannot keep up.

I have implemented some time.sleep() here and there to see if things get better, but no substantial improvement.

I would like to ask you if you have ever had issues like this one and how you got them programmatically solved.

Thanks in advance!

This is my checker:

def get_running_workflows(sf_client, sfn_arn, cnt=0, next_token=None):

    if next_token:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING', nextToken=next_token)
    else:
        response = sf_client.list_executions(stateMachineArn=sfn_arn, statusFilter='RUNNING')

    cnt  = len(response['executions'])

    if cnt > 0 and 'nextToken' in response:
        return get_running_workflows(cnt, response['nextToken'])

    return cnt

CodePudding user response:

You are going to run into problems with this approach because the call to start a new flow may not immediately cause the list_executions() to show a new number. There may be some seconds between requesting that a new workflow start, and the workflow actually starting. As far as I'm aware there are no strong consistency guarantees for the list_executions() API call.

You need something that is strongly consistent, and DynamoDB atomic counters is a great solution for this problem. Amazon published a blog post detailing the use of DynamoDB for this exact scenario. The gist is that you would attempt to increment an atomic counter in DynamoDB, with a limit expression that causes the increment to fail if it would cause the counter to go above a certain value. Catching that failure/exception is how your Lambda function knows to send the message back to the queue. Then at the end of the workflow you call another Lambda function to decrement the counter.

  •  Tags:  
  • Related