cpp
Expert Topics
10_Expert_Topics⚙️cpp
// Module 10: Expert Topics
// Lesson: Modern Multithreading and Concurrency
// Build: g++ -std=c++17 -pthread 01_multithreading.cpp -o multithreading_demo
// Description: Demonstrates creating threads, protecting shared state, coordinating work, and composing asynchronous tasks.
#include <chrono>
#include <condition_variable>
#include <future>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <numeric>
#include <queue>
#include <random>
#include <thread>
#include <vector>
using namespace std;
// Utility to print the banner for each part.
void printPart(const string &title)
{
cout << "\n========== " << title << " ==========" << endl;
}
// PART 1: Basic thread creation and joining.
void part1_basic_threads()
{
printPart("PART 1: Creating and Joining Threads");
auto worker = [](int id) {
cout << "Worker " << id << " starting on thread " << this_thread::get_id() << endl;
this_thread::sleep_for(chrono::milliseconds(200 + id * 50));
cout << "Worker " << id << " finished." << endl;
};
thread t1(worker, 1);
thread t2(worker, 2);
cout << "Main thread does other work..." << endl;
t1.join();
t2.join();
cout << "All workers joined.\n";
}
// PART 2: Shared state with mutex and lock_guard.
void part2_mutex_and_shared_state()
{
printPart("PART 2: Protecting Shared State");
vector<int> data(1'000, 1);
mutex m;
size_t processed = 0;
auto accumulateChunk = [&](size_t start, size_t end) {
int local_sum = accumulate(data.begin() + static_cast<long>(start),
data.begin() + static_cast<long>(end), 0);
lock_guard<mutex> guard(m);
processed += (end - start);
cout << "Chunk [" << start << ", " << end << ") sum = " << local_sum
<< " | processed so far = " << processed << endl;
};
vector<thread> workers;
size_t chunk = data.size() / 4;
for (size_t i = 0; i < data.size(); i += chunk)
{
workers.emplace_back(accumulateChunk, i, min(i + chunk, data.size()));
}
for (auto &t : workers)
{
t.join();
}
}
// PART 3: Producer-consumer with condition_variable.
class SafeQueue
{
public:
void push(int value)
{
{
lock_guard<mutex> lock(m_);
queue_.push(value);
}
cv_.notify_one();
}
bool pop(int &value)
{
unique_lock<mutex> lock(m_);
cv_.wait(lock, [&] { return !queue_.empty() || done_; });
if (queue_.empty())
{
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void signal_done()
{
{
lock_guard<mutex> lock(m_);
done_ = true;
}
cv_.notify_all();
}
private:
queue<int> queue_;
mutex m_;
condition_variable cv_;
bool done_ = false;
};
void part3_producer_consumer()
{
printPart("PART 3: Producer-Consumer Pipeline");
SafeQueue q;
thread producer([&] {
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<int> dist(1, 100);
for (int i = 0; i < 5; ++i)
{
int value = dist(gen);
cout << "Producing " << value << endl;
q.push(value);
this_thread::sleep_for(chrono::milliseconds(150));
}
q.signal_done();
});
thread consumer([&] {
int value;
while (q.pop(value))
{
cout << " Consuming " << value << endl;
this_thread::sleep_for(chrono::milliseconds(250));
}
cout << "No more work." << endl;
});
producer.join();
consumer.join();
}
// PART 4: Futures, promises, and async tasks.
long long heavyComputation(int n)
{
long long sum = 0;
for (int i = 1; i <= n; ++i)
{
sum += static_cast<long long>(i) * i;
}
this_thread::sleep_for(chrono::milliseconds(200));
return sum;
}
void part4_async_tasks()
{
printPart("PART 4: std::async and std::promise");
future<long long> f1 = async(launch::async, heavyComputation, 10'000);
promise<string> p;
future<string> f2 = p.get_future();
thread notifier([&p] {
this_thread::sleep_for(chrono::milliseconds(300));
p.set_value("Report ready!");
});
cout << "Heavy computation result = " << f1.get() << endl;
cout << "Notification: " << f2.get() << endl;
notifier.join();
}
int main()
{
part1_basic_threads();
part2_mutex_and_shared_state();
part3_producer_consumer();
part4_async_tasks();
cout << "\nExercises:\n"
<< "1. Extend PART 2 to use std::scoped_lock with multiple mutexes.\n"
<< "2. Modify SafeQueue to be bounded (fixed capacity).\n"
<< "3. Create a thread pool that reuses worker threads for a job queue.\n"
<< "4. Benchmark when std::async launches lazily vs eagerly using launch policies.\n";
return 0;
}