C++11 – Bellek modeli ve atomik türler

C++ programlama dili yeni gelen standartlarla beraber belirsiz durum (undefined behaviour) diyebileceğimiz durumları en aza indirmeyi amaçladığını daha önceki yazılarda az çok değinmiştim. C++11’ le gelen en büyük yeniliklerden biri de multi-thread kod yazıldığında memory ordering veya cpu ordering gibi durumlardan kaynaklabilecek belirsiz durumları en aza indirgemeyi amaçlayan standart bir bellek modeli sunmasıdır.

Performans kaygısıyla derleyici veya işlemciniz sizin adınıza bazı değişiklikler yapabilir. Yani aslında tam olarak sizin yazdığınız kodun assembly karşılığını elde edemeyebilirsiniz. Örneğin,

int count = 5;
while(count)
   cout << "Beast is Back!" << endl;

 

Yukarıdaki koda baktığımızda öncelikle bellekte bir değişken yaratılması ve onun register’a yüklenip işlemci tarafından okunmasını beklemekteyiz. Ama aslında olan derleyicinin bu koda bakıp döngüyü while(true) şekline dönüştürerek çalışma zamanında performans kazanımını sağlar. Tabii ki derleyicinin en iyileme (optimization) seçeneklerinin açık olduğunu farzediyorum. Yukarıdaki kodu gcc 7.3 ile derlendiğinde iki farklı durumda ortaya çıkan işlemci komutları aşağıdaki gibidir.

/// varsayılan şekilde derlendiğinde
https://godbolt.org/g/LnsuZ7

/// -O3 ile derlendiğinde
https://godbolt.org/g/RiCUkU

Threadler arasında sağladığı bölünmezlik garantisi ile bellekte tutarlılık (consistency) sağlanmasına yardımcı olan std::atomic şablon sınıfıdır. Yani diyelim ki bir threadA int türünden bir a değişkenini okurken o sırada threadB tam bu okuma işlemi sırasında a değişkeninin bellekteki değerini değiştirebilir. Bu da tam da yarış durumudur (race condition). Eğer değişkenimiz atomic olarak tanımlansaydı okuma veya yazma işlemi bitmeden bölünmemesi garanti altında olacaktı.

   std::atomic <int> a = 0;  // ekrana 200000 olarak yazılması garanti altındadır.
    // int a = 0;  ekrana ne yazılacağı meçhul, belirsizlik durumu (undefined bahviour)

    std::thread writeThread( [&] () {
                                  for(int i = 0; i < 100000; i++) {
                                        a++;
                                  }
                              }
    );

    std::thread readThread( [&] () {
                                 for(int i = 0; i < 100000; i++) {
                                        a++;
                                 }
                             }
    );

    writeThread.join();
    readThread.join();

    cout << a << endl;

 

atomic şablon sınıfı türünden değişkenlerin erişimi(load) ve yazımı(store) sırasında bellek erişim biçimiyle beraber threadler arasında bellek paylaşımlarını yönetebileceğimiz özellikler getirdi modern C++. Bu özellikler ve getirdiği standartlarla beraber mutex, condition variable gibi özelliklerle elde ettiğimiz threadler arası senkronizasyonu elde etme imkanı sağlıyor hem de daha az bir maliyetle.

memory_order_seq_cst

Inter-thread değişkenlerin tamamı eğer ‘memory_order_seq_cst’ parametresiyle sıralanmışsa tüm threadlerin üzerinde anlaştığı global bir sıralama vardır. Tüm load ve store operasyonları da kendi aralarında da sıralıdır.
Örneğin,

atomic x, y;

Thread 1                             Thread 2
x.store(10);                      cout << y.load() << endl;
y.store(20);                      cout << x.load() << endl;

Yukarıdaki sıralamaya göre y değişkenini load ettiğimizde x değişkenininde store edilmesi garanti altındadır. Kısaca ekrana 20 ve 10 yazılabilir. Ne yazılamaz dersek 20, 0 yazılma ihtimali yoktur çünkü 'memory_order_seq_cst' sıralı tutarlılığı(sequential concsistency) garanti eder. Ayrıca her iki threadde de store ve load operasyonları sıralıdır. Yani bellekte x store edilmeden y store edilmez aynı şekilde load işleci için de geçerlidir. Unutmadan ekleyelim varsayılan bellek sıralama seçeneğidir.

memory_order_relaxed:

Atomic olmayan bir değişkenle yapılan sıralamalar gibidir. Herhangi bir bellek bariyeri (memory barrier) söz konusu değildir. Relax çalışır :slight_smile:. Yalnız hiçbir sıralama garantisi yoktur dersek de doğru olmaz tıpkı single thread uygulamalardaki gibi aynı thread içerisindeki load ve store operasyonlarında happens-before sıralaması vardır. Fakat load operasyonları store operasyonlarını rassal bir sıralama da görür. Yukarıdaki örneğe dönersek, 20-10, 20-0, 0-0, 0-10 gibi tüm ihtimaller olasıdır.

memory_order_acquire – memory_order_release:

memory_order_seq_cst daki gibi üzerinde anlaşılan global bir sıralama yoktur ama memory_order_relaxed sıralamasındaki gibi de değildir. Aynı thread içerisinde yine happens-before ilişkisiyle beraber acquire-release operasyonları arasında da synchronize-with ilişkisi vardır. Aslında bir anlamda memory_order_seq_cst in daha hafif halidir.
atomic x, y;

Thread 1 Thread2
x.store(10, memory_order_release); cout<<y.load(memory_order_acquire) << endl;
y.store(20, memory_order_release); cout <<x.load(memory_order_acquire) <<endl;

Burda release ve acquire operasyonları ikili (pairwise) olarak senkronize olarak çalışır. Eğer herhangi bir şekilde bu sekronizasyon kurulmadıysa relaxed olarak çalışır.

İhtiyacım olan single producer-single consumer lock-free queueyu şuradaki örneği alarak ihtiyaçlarım doğrultusunda geliştirdim. Fena da olmadı.

Yaptığım testlerde mutex ile implemente edilen versiyonu ile lock-free versiyonu arasında ciddi performans farkı var. Tabi bu fark ringbuffer kaç kez tur attığıyla orantılı olarak büyüyor. Kasıtlı olarak rakam vermiyorum ki herkes kendi bilgisayarında çalıştırıp farkı görsün.

Locksız versiyon:

template 
class GenericRingBuffer
{
public :

    GenericRingBuffer()
    {
        m_write.store(0, std::memory_order::memory_order_relaxed);
        m_read.store(0, std::memory_order::memory_order_relaxed);
    }

    void push(T val)
    {
        while(!try_push(val));
    }

    void pop(T *val)
    {
        while(!try_pop(val));
    }

    void pop_bulk(T *dst, size_t len)
    {
        while(!try_pop_bulk(dst, len));

    }

    void push_bulk(T *src, size_t len)
    {
        while(!try_push_bulk(src, len));
    }

private:

    bool try_push(T val)
    {
        const auto currentWHead = m_write.load(std::memory_order_relaxed);
        const auto nextWHead = currentWHead + 1;
        if (currentWHead - m_read.load(std::memory_order_acquire) = currentRHead) {
            if (currentWHead -  currentRHead < len)
                return false;
        }

        else if (currentWHead < currentRHead){
            if ((m_size - currentRHead) + currentWHead < len)
                return false;
        }

        size_t nextRHead = (currentRHead + len) % m_size;
        if (nextRHead  currentRHead) {
            if ((m_size - currentWHead) + currentRHead < len)
                return false;
        }

        else if (currentWHead < currentRHead) {
            if (currentRHead -  currentWHead < len)
                return false;
        }

        const auto nextWHead = (currentWHead + len) % m_size;
        if (nextWHead < currentWHead) {
            memcpy(m_buffer + currentWHead, src, (m_size - currentWHead) * sizeof(T));

            memcpy(m_buffer, src + (m_size - currentWHead),
                   nextWHead);
        }

        else
            memcpy(m_buffer + currentWHead, src, len);

        m_write.store(nextWHead, std::memory_order_release);
        return true;
    }

private :
    char cachePad_1[64];
    std::atomic<size_t> m_write;
    char cachePad_2[64];
    std::atomic<size_t> m_read;
    T m_buffer[m_size];
};

 

Mutexli versiyon ise;

 


template 
class GenericRingBuffer
{
public :

    GenericRingBuffer() 
    {
        m_write = 0;
        m_read = 0;
    }

    void clear()
    {
        std::lock_guard lock(mtx);
        m_read = 0;
        m_write = 0;

    }

    bool push(T val)
    {
        std::lock_guard lock(mtx);
        const auto nextWHead = m_write + 1;
        if (m_write - m_read <= m_size - 1) {
            m_buffer[m_write % m_size] = val;
            m_write = nextWHead;
            return true;
        }

        return false;
    }

    bool pop(T *val)
    {
        std::lock_guard lock(mtx);
        const auto nextRHead = m_read + 1;

        if (m_read == m_write) {
            return false;
        }

        *val = m_buffer[m_read % m_size];
        m_read = nextRHead;

        return true;
    }

    bool pop_bulk(T *dst, size_t len)
    {
        std::lock_guard lock(mtx);

        if (m_write >= m_read) {
            if (m_write -  m_read < len)
                return false;
        }

        else if (m_write < m_read){
            if ((m_size - m_read) + m_write < len)
                return false;
        }

        size_t nextRHead = (m_read + len) % m_size;
        if (nextRHead < m_read) {
            memcpy(dst, m_buffer + m_read, (m_size - m_read) * sizeof(T));
            memcpy(dst + (m_size - m_read), m_buffer, nextRHead * sizeof(T));

        }

        else {
            memcpy(dst, m_buffer + m_read, len);
        }

        m_read = nextRHead;

        return true;

    }

    bool push_bulk(T *src, size_t len)
    {
        std::lock_guard lock(mtx);

        if (m_write > m_read) {
            if ((m_size - m_write) + m_read < len)
                return false;
        }

        else if (m_write < m_read) {
            if (m_read -  m_write < len)
                return false;
        }

        const auto nextWHead = (m_write + len) % m_size;

        if (nextWHead < m_write) {
            memcpy(m_buffer + m_write, src, (m_size - m_write) * sizeof(T));

            memcpy(m_buffer, src + (m_size - m_write),
                   nextWHead);
        }

        else
            memcpy(m_buffer + m_write, src, len);

        m_write = nextWHead;
        return true;

    }


private :
    size_t m_write;
    size_t m_read;
    T m_buffer[m_size];
    std::mutex mtx;
};

int main (int argc, char** argv)
{
    auto start = chrono::steady_clock::now();

    char a[10] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'k'};

    GenericRingBuffer grb;

    std::thread write_thread( [&] () {
                                  for(int i = 0; i < 2000; i++) {
                                      grb.push_bulk(a, 10);
                                  }
                              }
    );

    std::thread read_thread( [&] () {
                                 for(int i = 0; i < 2000; i++) {
                                     char b[10];
                                     grb.pop_bulk(b, 10);
                                 }
                             }
    );

    write_thread.join();
    read_thread.join();


    auto end = chrono::steady_clock::now();
    cout << chrono::duration  (end - start).count() << " ms " << endl;

    return 0;
}

Şunu da şuraya bırakayım:
https://github.com/rigtorp/awesome-lockfree

Bu arada bu wordpress uzun zamandır canımı sıkıyor sentaks highlighting konusunda bu yazının daha yakışıklı versiyonuna burdan ulaşabilirsiniz.

Not: Burada verilen kodlar eğitim amaçlıdır hiçbir şekilde ticari bir garanti verilmemektedir. Kullananın kendi sorumluluğundadır.
#The End

Advertisements

7 thoughts on “C++11 – Bellek modeli ve atomik türler

  1. Merhaba,

    Bilgiler için teşekkürler. Makale altında verdiğiniz linkte kodlar tam olarak çıkmış fakat burada verilen kodlarda eksiklikler var. Template tanımlama kısmında ve header başlık dosyalarında eksiklikler var.

    *Template Kısmı:*
    template <class T, size_t m_size>

    *Baslik dosyalari:*

    #include <iostream>
    #include <atomic>
    #include <cstring>
    #include <chrono>
    #include <thread>

    *Namespace:*
    using namespace std;

    1. wordpressin editoru siliyor. Yakin bir zamanda baska bir yazilima gecmeyi dusunuyorum. Belki daha interaktif bir ortam saglayan bir yazilima.

  2. Merhaba çok güzel bir yazı olmuş. Benim aklıma takılan bir soru var. Örnek kodları incelediğimde “condition variable” için ayrı bir scope alanı açılıyor bunun sebebi nedir ?

    Teşekkürler

      1. int main()
        {
        std::thread worker(worker_thread);

        data = “Example data”;
        // send data to the worker thread (Burası örneğin burada neden tekrar “{}” kullanıldı ? )
        {
        std::lock_guard lk(m);
        ready = true;
        std::cout << "main() signals data ready for processing\n";
        }
        cv.notify_one();

        // wait for the worker (Burasıda aynı şekilde )
        {
        std::unique_lock lk(m);
        cv.wait(lk, []{return processed;});
        }
        std::cout << "Back in main(), data = " << data << '\n';

        worker.join();
        }

        Biraz karışık oldu kusura bakmayın.

  3. ben burda condition variable kullanmadim ama baska bir yerde gorup sordugunuzu varsayarak cevaplamaya calisayim. {} yeni bir scope yaratiyor aslinda hersey std::lock_guard sinifina gecilen mutex nesnesinin, std::lock_guard scope sonunda destruct olurken bu mutex nesnesinin unlock edilmesi ile alakali. Ayni durum unique_lock icin de gecerli. Su yazinin okunmasinda fayda var. Yani durumun aslinda condition variable ile bir alakasi yok.

    https://cppturkiye.wordpress.com/2017/05/31/c11-stdmutex/

    1. Şimdi anladım. “std::lock_gurad” RAII kullandığı için destruct olması için scope dışına çıkması gerekiyor.
      “{}” açarak istediğimiz yerde destruct olmsını sağlıyoruz.

      Teşekkürler 🙂

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s