Imagine, you built a system and are using concurrency to make it more responsive.
This means your main thread receives tasks from your users and creates threads to execute those tasks so that while executing the tasks, you do not want your system to go down and not receive any further tasks.
Now imagine your user base grows, and you receive lots of tasks to do. If you create different threads for each task, you are wasting your time creating and deleting threads every time a new task arrives or is completed.
In that situation, you should switch to a thread pool architecture.
Because a thread pool is the most efficient way to execute multiple tasks concurrently.
So now let’s explore how you can create a thread pool to manage your multiple tasks concurrently.
Before starting the actual coding part, let’s first understand what a thread pool is and how it works.
Thread Pool
A thread pool is a collection of worker threads that are created to do multiple tasks simultaneously and wait for the new tasks to arrive.
This means, multiple threads are created and all of them sit idle.
Whenever your system gets tasks it can quickly assign tasks to those threads and save lots of time without creating and deleting threads multiple times.
As you can see, a thread pool is a collection of multiple threads that are waiting to receive tasks from the main thread to execute.
In that picture, there are a total of 15 tasks in the main thread and all those are forwarded to different worker threads to execute concurrently.
As you got the idea of thread pool let’s understand the inner workings of thread pool.
How Does a Thread Pool Work Under the Hood?
In a thread pool architecture, there is a main thread that has only two tasks.
Receive all the tasks and store them in a place.
Create multiple threads and assign them different tasks periodically.
Now let’s understand how the thread pool works.
A thread pool is created before receiving the tasks and stored somewhere with an ID so that we can identify them by their ID.
Then every thread is just waiting to receive any task, if they get the task they start working on it.
After finishing, they again wait for the next task.
While that thread is busy doing the task, the main thread assigns more tasks to other threads so that no thread has to sit idle until the main thread goes out of task.
After completion of all tasks, the main thread terminates all threads and closes the thread pool.
Now you know how a thread pool operates. So, let’s do some programming and implement a thread pool architecture in Rust.
Implementing A Thread Pool in Rust
Let’s take a rough structural overview of what we need to create a thread pool.
So, we need three things.
A way to create threads.
A place to store them.
A way to assign them different tasks.
Now let’s go through it one by one.
1. A way to create threads
We need a function that spawns a thread and returns its JoinHandle
.
Also, we need some sort of ID, so that if we messed up then we can log the errors with thread ID so we can know which thread got wrong.
As you can tell, if two interrelated data need to combine, we need a struct
. So, let’s create one.
struct Worker {
id: usize,
thread: JoinHandle<()>
}
Now we have to implement a constructor function that can return a new `Worker`.
impl Worker {
fn new(id: usize) -> Self {
let thread = thread::spawn(|| {});
Self {id, thread}
}
}
Now our function is ready to create threads and return them to the caller.
So, let’s create a thread pool that can hold all the threads and execute tasks on different threads.
2. A place to store them
So we need a struct
that holds all the JoinHandles
of all threads, also we want to control how many threads our thread pool can have.
This means, a struct
with a contractor function that tasks a number to indicate the number of threads and it must call the thread contractor to create threads.
struct ThreadPool {
workers: Vec<Worker>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i));
}
Self { workers }
}
}
We have functions to create threads and manage threads, now it’s time to create a function that can assign tasks to different threads.
3. A way to assign them different tasks.
Our thread pool struct
must have a function that can assign and make them execute the task inside the threads.
But we have a problem, how can we send tasks to threads so that they can execute them?
To do so, we need a task
type that represents the tasks we need to complete.
type task = Box<dyn FnOnce() + Send + 'static>;
Here you do not have to know all these keywords but for context, Box<dyn>
means our tasks must implement these traits
.
Do not worry, we will cover traits in more detail in another post, but for now, just know that our task implement FnOnce()
means our task is a function that can be run only one time.
Then Send
, as we are sending tasks from the main
thread to worker threads, so, we make our task Send
type so that it can safely transferred between threads.
Next is 'static
which means we need our task must live as long as the program is running.
So, now it’s time to send the task to the worker thread.
But to do so, we have to establish a channel between our main
thread to all worker
threads.
So we need Arc<Mutex<()>>
.
Let’s update both constrictor methods.
struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>
}
impl ThreadPool {
fn new(size: usize) -> Self {
assert!(size > 0, "Need at least 1 worker!");
let (sender, reciever) = mpsc::channel();
let reciever = Arc::new(Mutex::new(reciever));
let mut workers = Vec::with_capacity(size);
for i in 0..size {
workers.push(Worker::new(i, Arc::clone(&reciever)));
}
Self {
workers,
sender: Some(sender)
}
}
}
impl Worker {
fn new(id: usize, reciever: Arc<Mutex<Receiver<Task>>>) -> Self {
let thread = thread::spawn(move || {});
Self {
id,
thread
}
}
}
Here in the ThreadPool
constructor, we create a new channel and wrap the receiver
in Arc<Mutex<()>>
because we will send the receiver
to the worker thread so that from the main
thread we can send tasks and worker threads can receive them.
Also, we have to update the ThreadPool
struct
to include a sender
which is going to be used by the main
thread to send tasks to different threads.
Now, let’s implement the logic to execute the task inside the worker threads.
fn new(id: usize, reciever: Arc<Mutex<Receiver<task>>>) -> Self {
let thread = thread::spawn(move || {
loop {
let receiver = reciever.lock()
.expect("Failed to grab the lock!")
.recv();
match receiver {
Ok(task) => {
println!("Thread {} got the task& executing.", id);
task();
thread::sleep(Duration::from_millis(10));
},
Err(_) => {
println!("No got the task");
break;
}
}
}
});
Self {
id,
thread
}
}
Here, we are looping and in every loop we are trying to get the lock
and calling recv()
on the lock
so that we can get the task sent by the main
thread.
Calling returns a Result
so instead of unwrapping it, we do an match
on it so we can log when the thread got the task or not.
If we fail to get no task then we are breaking out of the loop to avoid extra CPU cycles.
Now it’s time to implement a function ThreadPool
to send tasks to different threads.
We used ThreadPool
to store threads and then used Worker
to execute tasks but we have not implemented a function to send tasks to different threads.
Let’s implement it.
impl ThreadPool {
fn new(size: usize) -> Self {
// snip
}
fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static
{
let job = Box::new(job);
self.sender.send(job)
.expect("Failed to send the job to workers!");
}
}
Now, that we have completed the all functionalities we need to create a thread pool, let’s run it and test our first thread pool.
Ohh! We have made a great mistake.
Can you tell me what it is?
You have 10 seconds to answer it.
1 2 3 4 …
Okay, let me explain.
We spawn different threads and execute different tasks on them but we forget to join
them before the main
thread terminates which is why our program keeps running.
Let’s fix it.
As our thread pool ThreadPool
manages our all threads, we need to create a function that dynamically terminates all threads when the ThreadPool
end.
In simple terms, we have to manually implement the Drop
trait for ThreadPool
where we will terminate all the threads.
So, let’s do it.
# main.rs
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Thread {} is shutting down.", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().expect(format!("Failed to join the thread {}", worker.id).as_str());
}
}
}
}
Here we have to also drop the sender
because if we do not do it then the receiver
is going to loop forever. If we drop the sender
then the receiver
automatically drops and we can successfully go out of the program.
Let’s give it a run and see what happens.
Nice, we have successfully created a working thread pool.
Conclusion
Thread pool architecture is efficient when it comes to handling multiple tasks simultaneously.
Big companies are also using thread pools to handle their requests but their thread pools are more advanced.
I hope you find it helpful, also if you have any questions feel free to ask in the comment section.
Have a great day.