Home > Blockchain >  Leaky bucket algorithm with concurrency
Leaky bucket algorithm with concurrency

Time:01-24

Trying to mimic a scenario where multiple threads are creating the traffic to fill the buckets & a thread which leaks the bucket a specified rate. However,code is running into deadlock. Could you pl review this code ? Let me know if you see any errors & best possible modifications that I should add.

Code

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>
#include <chrono>

using namespace std;

class LeakyBucket {
public:
    LeakyBucket(int size, int rate) : maxCapacity(size), leakRate(rate), filled(0)  {}
    void add(int newDataSize) {
        unique_lock<mutex> lk(_mtx);
        _cond.wait(lk, [this](){
           return  filled<=maxCapacity;
        });


        filled = (filled newDataSize) > maxCapacity ? maxCapacity:(filled newDataSize);
        cout<<"\n Filled bucket with : "<<newDataSize;
        cout<<"\n Filled: "<<filled<<"\n ----------";
        _cond.notify_one();
    }

    void leak() {
        while(1) {
            {
                unique_lock<mutex> lk(_mtx);
            _cond.wait(lk, [this]() {
                return filled > 0 || _done;
            });
            if(_done)
                break;

            filled = (filled-leakRate<0) ? 0 : (filled-leakRate);
            cout << "\n Leaked bucket with leakRate";
            cout << "\n BucketFilledRemain: " << filled << "\n ----------";
            _cond.notify_one();
            }
            _sleep:
            this_thread::sleep_for(chrono::seconds(1));
        }
    }

    bool _done = false;
private:
    atomic<int> filled;
    int maxCapacity;
    int leakRate; // Per second
    mutex _mtx;
    condition_variable _cond;

};

void runLeakyBucketAlgorithm() {
    LeakyBucket *lb = new LeakyBucket(30, 20);

    thread t1(&LeakyBucket::leak, lb);
    thread t2([&](){
       for(int i=0; i<10; i  ) {
           cout<<"\n launching thread: "<<i;
           lb->add(rand()@);
       }
       this_thread::sleep_for(chrono::seconds(5));
       lb->_done = true;
    });
    if(t2.joinable()) {
       t2.join();
    }

    t1.join();
}

O/p:

 launching thread: 0
 Filled bucket with : 7
 Filled: 7
 ----------
 launching thread: 1
 Filled bucket with : 9
 Filled: 16
 ----------
 launching thread: 2
 Leaked bucket with leakRate
 BucketFilledRemain: 0
 ----------
 Filled bucket with : 33
 Filled: 30
 ----------
 launching thread: 3
 Filled bucket with : 18
 Filled: 30
 ----------
 launching thread: 4
 Filled bucket with : 10
 Filled: 30
 ----------
 launching thread: 5
 Filled bucket with : 32
 Filled: 30
 ----------
 launching thread: 6
 Filled bucket with : 24
 Filled: 30
 ----------
 launching thread: 7
 Filled bucket with : 38
 Filled: 30
 ----------
 launching thread: 8
 Filled bucket with : 3
 Filled: 30
 ----------
 launching thread: 9
 Filled bucket with : 29
 Filled: 30
 ----------
 Leaked bucket with leakRate
 BucketFilledRemain: 10
 ----------
 Leaked bucket with leakRate
 BucketFilledRemain: 0

CodePudding user response:

There are multiple fundamental bugs in the shown code.

thread t1(&LeakyBucket::leak, lb);

leak() will wait until the bucket has at least 0 fill rate, then subtract the leak rate from it. Then it will be done. That's it. It will be no more. The leaking thread will cease to exist. It will become an ex-thread. It will be pining for the fjords, forever. Once the bucket has leaked once, its leaking hole gets plugged, and it becomes a completely leak-proof bucket.

 new LeakyBucket(30, 20);

The bucket's capacity is 30, and it's leak rate is 20.

lb->add(rand()@);

This gets called ten times, to add anywhere between 0 and 39 drops of water.

So, let's say the first time we drop 20 drops of water into the bucket. The leak thread will wake up, take those 20 drops of water out, and earn its well-deserved retirement.

But wait, we have nine more additions of water coming!

The second call to add() will drop 25 drops of water. The third attempt adds 30 drops of water. The bucket is now over capacity. The fourth call to add() will now block forever because, as we've just seen, the bucket is completely leak-proof now.

That's the first bug: the bucket leaks once, then it does not leak any more.

            _cond.wait(lk, [this]() {
                return filled > 0;
            });

            filled -= leakRate;

The leak in the bucket will wait until there's at least 1 drop of water in the bucket, then leak 20 drops of water. So, if five drops of water were already in the bucket the bucket will, after all of this, have a negative fifteen drops of water. This obviously makes no sense, so this would be the second bug that will need to get fixed, before this works correctly.

There's probably a third bug here, too. The bucket is defined as having a certain capacity. However, in one of my examples, above, the bucket ends up having more drops of water than its stated capacity. That also does not add up, as well.

  •  Tags:  
  • Related