### A Pluto.jl notebook ### # v0.20.13 using Markdown using InteractiveUtils # ╔═╡ ad923771-2962-4dcc-a2ee-a229cda1aa27 md" # Multithreading & Asynchronous Programming in Julia ### Plan 1 Setup and Context * What is multithreading and parallel computing * Julia Tasks 2 Multithreading * Basic multithreaded loop * Scheduling strategies * Data race condition * Locks (Mutex) * Atomic operations 3 Asynchronous Programming * async/sync * Channel & Tasks " # ╔═╡ 7629cf62-7343-11f0-127c-db493aec2b55 md" ### Setup and Context #### What is Multithreading? Multithreading allows a program to run **multiple computations at the same time** on separate CPU cores. This is especially useful for **CPU-bound tasks** where raw computing power is the bottleneck. In Julia, multithreading is enabled with the environment variable `JULIA_NUM_THREADS`, and used via macros like `Threads.@threads` and `Threads.@spawn`. The goal is to split work into smaller, independent parts that can be safely computed in parallel. We'll also talk about asynchronous programming, which defines coroutines running at the same time on a single thread, and able to yield control to each other while they wait for something. " # ╔═╡ 328fd9ba-5d35-462b-a4b8-e5f5e19fdbf9 Threads.nthreads() # ╔═╡ b6c596bf-2124-40a4-8b24-8d512ac7ef7c md" #### Julia Tasks I will refer to Tasks many times during this workshop because they're the unit of computational work when it comes to dispatching work across threads or process it asynchronously. From the julia documentation: > You can think of a Task as a handle to a unit of computational work to be performed. It has a create-start-run-finish lifecycle. Tasks are created by calling the Task constructor on a 0-argument function to run, or using the @task macro: " # ╔═╡ 69851843-19a3-4e15-a274-f52493baab90 t = @task begin; sleep(5); println("done"); end # ╔═╡ 0820f571-3177-40bd-a331-8551012f167a md" A task is defined, but not executed by default. It needs to be scheduled to be executed (see scheduling strategies in the multithreading section) " # ╔═╡ 3a081977-c057-4d5c-83ac-7bdd234b16f0 schedule(t); wait(t) # ╔═╡ 350d6628-9724-47b9-b48f-f4064138f170 md"this executes the task on whatever thread scheduled it. But since our CPU has many threads, we can send any task to be handled by another thread!" # ╔═╡ 3cb2f4d4-6760-406d-a6c3-b045338605f9 md" ### Multithreading #### Basic multithreaded loop " # ╔═╡ 2384466b-c0b1-4c46-98ba-fa9183053ced md" Threads like to be dispatched on repetitive CPU bound process, which is why we generally dispatch independant operations of a loop on a thread. " # ╔═╡ 2d47f735-dda8-43f5-a997-90383eb38883 begin max_val = 10_000_000 output = zeros(Int, max_val) # Single Thread @time for i in 1:max_val output[i] = i^2 end println(output[1:10]) output = zeros(Int, max_val) # Dispatching threads with :dynamic scheduling @time Threads.@threads for i in 1:max_val output[i] = i^2 end println(output[1:10]) end # ╔═╡ f83c2c79-70f5-4579-a6cf-fd3fb6875ec3 md" * Tasks that can be done indepentendly from each other can easiliy be multi-threaded. * There's an overhead cost to doing things with multiple threads. Meaning that if your tasks are not very long or you don't have a lot of them, you might slow the process with multiple threads. - Try with a max\_val of 10\_000\_000, then 100. " # ╔═╡ 9febe54a-f120-417b-8f3c-33eb14b61b66 md" ### Scheduling Strategies julia supports three scheduling strategies since 1.11. Those dictate how your tasks are assigned to threads in the threadpool. #### :static The iterator is divided upfront into equal chunk of tasks, with each thread assigned a chunk once. This assumes that every task takes the exact same computing resources. This leads to very low overhead costs but will lead to idle threads if every task is not uniform in required computing time. Requires the iterator to have a known length. #### :greedy Every thread in the threadpool is assigned the \"next available\" small chunk of tasks (typically a single task), meaning they take the next iteration as soon as they're spawned or done working. This reassigns threads quickly, which handles uneven workloads better but leads to higher scheduling overhead. Does not require the iterator to have a known length. Good if workload is not uniform #### :dynamic (default) Threads pull fixed-size chunks of work dynamically from a shared pool. Each thread takes a chunk, processes it, then takes another when finished. This strikes a balance between static by having better thread reassignement to handle uneven workload, while having less overhead than :greedy. Requires the iterator to have a known length. " # ╔═╡ d6167683-ee02-421c-80d6-3f82b7111b8a function busy(i; verbose=true) sleep(0.001 * rand()) verbose && println("i=$i done by thread $(Threads.threadid())") end # ╔═╡ 0cd33170-7cc9-4bda-8a01-2ab91c36f42f begin @time Threads.@threads :static for i in 1:32 busy(i, verbose=true) end end # ╔═╡ edb15b13-5721-4624-bce8-c8ddd30f7167 md" With static, each thread was assigned 2 iterations at the very start (thread 1 did operation 1 & 2, thread 2 did 3 & 4, etc...) " # ╔═╡ 2af7c2b1-ffc5-4a90-9726-10c1b2411f2b begin @time Threads.@threads :greedy for i in 1:32 busy(i, verbose=true) end end # ╔═╡ 8deab6bd-4bf8-4bbf-a2c8-8f3160a4c38a begin @time Threads.@threads :dynamic for i in 1:32 busy(i, verbose=true) end end # ╔═╡ 5861467d-95e0-4b9f-83da-923129cdcbb5 md" With greedy or dynamic, there's not much sense to what thread does what. All of them are either greedily grabbing the next iteration as soon as they can, or the chunks are interleaved among threads as they become available. Try with 10_000 iterations to see a difference. " # ╔═╡ eebea6ae-e373-4afb-933d-e09c1e4ec742 md" ### Data Race Conditions The bane of multithreading is data-racing. The core rule of multithreading and parallel computing is **never let multiple threads change the same thing at the same time**. " # ╔═╡ 618c160c-efdf-4ddc-8ab0-18f21b2921a2 md"*Single thread incrementing a variable (expected behaviour)*" # ╔═╡ 5ebb5489-38f6-4f42-ae36-94d68c29196e function singlethread_count() total_single = 0 for i in 1:1_000_000 total_single += 1 end println("Total (single thread): $total_single") end # ╔═╡ a06d0d8f-1949-4ba0-afca-a588619adcf8 md"*Multiple threads incrementing a variable (data race)*" # ╔═╡ 06d82d8f-0a92-4536-b7d5-1ca6318df548 function multithread_count() total_threads = 0 Threads.@threads for i in 1:1_000_000 total_threads += 1 end println("Total (multithreaded): $total_threads") end # ╔═╡ 4c4d530e-7e0a-4664-958d-2f84bcf9ad36 begin @time singlethread_count() @time multithread_count() end # ╔═╡ da5661b1-2aeb-4cd7-93d9-a20b40403f03 md" #### Solving Data Race Conditions There are three ways of solving a data race: * Adapting the algorithm in such a way that the data race is solved (best solution, think parallel merging of sorted tables) * Using locks to prevent threads from writing in the same space at the same time * Using atomic operations when possible ##### Using Locks " # ╔═╡ ce331178-a08d-4125-9e87-1f2febed4e62 begin function locked_multithread_count() total_threads = 0 lock = Threads.SpinLock() Threads.@threads for i in 1:1_000_000 Threads.lock(lock) do total_threads += 1 end end println("Total (multithreaded with lock): $total_threads") end @time locked_multithread_count() end # ╔═╡ 304890f9-e1ab-4e3a-833a-b6343f53d9da md" *Each thread can only execute whatever is in the locked block if another thread isn't already doing it. In this case this is absolutely horrendous since the only operation of the loop is then done one thread at a time, which means it's equivalent to using 1 thread while paying the overhead of multithreading and keeping track of a lock* " # ╔═╡ f0a4724f-3a77-4dff-8f86-ad2840be6170 md" ##### Using Atomic " # ╔═╡ 5aa06357-ddd8-4967-bc21-034a4e8272e6 begin function atomic_multithread_count() total = Threads.Atomic{Int}(0) Threads.@threads for i in 1:1_000_000 Threads.atomic_add!(total, 1) end println("Total (multithreaded with atomic): $(total[])") end @time atomic_multithread_count() end # ╔═╡ 01efde9a-ad6f-426e-8ee7-96806d241d7b md" *Atomic operations honestly kinda magical. Whenever what you're doing is a short serie of simple CPU instructions (like a \"add\" here), you can use atomic operations to ensure it's indivisible and not interuptable, which garanties that no thread can corrupt the result. This is generally much faster than locking if you only have simple instructions, though here it's still slower than a single thread because there's no benefit to multithreading in this example* " # ╔═╡ a0392b17-913d-4033-b739-48ee822086f5 md"### Asynchronous Programming Unlike multi-threading, async programming is usually described as \"I/O bound\". It's a way make code that awaits a flux of input and runs asynchronously from whatever code launched it. Functions that can run asynchronously are called coroutines. Note that asynchronous coroutines usually still run on a single thread (though you can mix multithreading and asynchrous programming), but coroutines can yield control while they wait for something (typically I/O), allowing other coroutines to work in the meantime. The `@async` macro allows you to create Tasks that will be immediately scheduled, independently from the main thread. " # ╔═╡ ec465b96-dc39-4a46-a800-0ba9b5a5110e begin function async_subprocess_demo() println("Starting async I/O from subprocesses...") # Start two processes that echo lines slowly p1 = open(`julia -e 'for i in 1:5; println("A $i"); sleep(0.5); end'`, "r") p2 = open(`julia -e 'for i in 1:5; println("B $i"); sleep(0.3); end'`, "r") # This coroutine reads from p1 and is schedule immediately t1 = @async for line in eachline(p1) println("From t1: line $line") end # This coroutine reads from p2 and is also scheduled immediately t2 = @async for line in eachline(p2) println("From t2: line $line") end println("Main thread is free to do other things...") # Wait allows you to stop main thread until a Task is done wait(t1) wait(t2) println("All subprocess output received.") end async_subprocess_demo() end # ╔═╡ 4386d149-297b-439b-9454-e3cd98166231 md" Channels are queues that are synchronized and accessible accross your coroutines. You can then organise code in a \"producer/consumer\" way. A producer Task puts items in a queue, a consumer tasks takes items from the queue whenever one is available (exploiting the I/O bound nature of coroutines). " # ╔═╡ 01d4906c-47a6-40b4-ae69-cde258836d69 begin function async_pipeline_basic() jobs = Channel{Int64}(10) # This channel has a maximum of 10 items # Producer task prod = @async try for i in 1:10 println("Producing: Task $i") put!(jobs, i) sleep(0.5) # simulate spacing out jobs end close(jobs) # signal no more jobs catch e @error "producer crashed..." exception=(e, catch_backtrace()) end # Consumer task cons = @async try for task in jobs println("Processing: Task $task") sleep(1.0) # simulate longer work end println("All jobs processed.") catch e @error "consumer crashed..." exception=(e, catch_backtrace()) end println("Pipeline running... (main thread)") wait(cons) # Wait for the consumer to finish end @time async_pipeline_basic() end # ╔═╡ d914f466-22d5-4e81-bf64-59939b44ae60 md" Note the following: * Channels are buffered, meaning they have a limited capacity. If a Tasks tries to put an item in a full queue, it will be paused until space is freed. * Asynchronous code crashes *silently* unless you use a try block. If it seems your code is doing nothing for a long time, check that your coroutines are still alive " # ╔═╡ a7987e02-9800-47c3-888d-a8c025f6a1a3 md" If you produce more than you consume, you can even create **multiple** consumers" # ╔═╡ a2f51d4d-8cbf-4f04-ac66-852cebe03faf begin function async_pipeline_multiconsume() jobs = Channel{Int64}(10) # This channel has a maximum of 10 items # Producer task prod = @async try for i in 1:10 println("Producing: Task $i") put!(jobs, i) sleep(0.5) # simulate spacing out jobs end close(jobs) # signal no more jobs catch e @error "producer crashed..." exception=(e, catch_backtrace()) end # We want to store all consumers consumers = Vector{Task}() for cid in 1:3 # Making 3 consumers cons = @async try for task in jobs println("Processing: Task $task") sleep(1.0) # simulate longer work end println("All jobs processed.") catch e @error "consumer crashed..." exception=(e, catch_backtrace()) end push!(consumers, cons) end println("Pipeline running... (main thread)") wait.(consumers) # Wait for the consumers to finish end @time async_pipeline_multiconsume() end # ╔═╡ fdbd2f77-4dbe-4b94-a68d-2ed2bd5842d0 md" Note that this is already faster, but this is not even the most beneficicial scenario where the queue becomes eventually full. With a much smaller queue, or a situation where the queue gets filled fast, having multiple consumers can prevent the producer from idling, saving a lot of time. " # ╔═╡ 00000000-0000-0000-0000-000000000001 PLUTO_PROJECT_TOML_CONTENTS = """ [deps] """ # ╔═╡ 00000000-0000-0000-0000-000000000002 PLUTO_MANIFEST_TOML_CONTENTS = """ # This file is machine-generated - editing it directly is not advised julia_version = "1.11.6" manifest_format = "2.0" project_hash = "da39a3ee5e6b4b0d3255bfef95601890afd80709" [deps] """ # ╔═╡ Cell order: # ╟─ad923771-2962-4dcc-a2ee-a229cda1aa27 # ╟─7629cf62-7343-11f0-127c-db493aec2b55 # ╠═328fd9ba-5d35-462b-a4b8-e5f5e19fdbf9 # ╟─b6c596bf-2124-40a4-8b24-8d512ac7ef7c # ╠═69851843-19a3-4e15-a274-f52493baab90 # ╟─0820f571-3177-40bd-a331-8551012f167a # ╠═3a081977-c057-4d5c-83ac-7bdd234b16f0 # ╟─350d6628-9724-47b9-b48f-f4064138f170 # ╟─3cb2f4d4-6760-406d-a6c3-b045338605f9 # ╟─2384466b-c0b1-4c46-98ba-fa9183053ced # ╠═2d47f735-dda8-43f5-a997-90383eb38883 # ╟─f83c2c79-70f5-4579-a6cf-fd3fb6875ec3 # ╟─9febe54a-f120-417b-8f3c-33eb14b61b66 # ╠═d6167683-ee02-421c-80d6-3f82b7111b8a # ╠═0cd33170-7cc9-4bda-8a01-2ab91c36f42f # ╟─edb15b13-5721-4624-bce8-c8ddd30f7167 # ╠═2af7c2b1-ffc5-4a90-9726-10c1b2411f2b # ╠═8deab6bd-4bf8-4bbf-a2c8-8f3160a4c38a # ╟─5861467d-95e0-4b9f-83da-923129cdcbb5 # ╟─eebea6ae-e373-4afb-933d-e09c1e4ec742 # ╟─618c160c-efdf-4ddc-8ab0-18f21b2921a2 # ╠═5ebb5489-38f6-4f42-ae36-94d68c29196e # ╟─a06d0d8f-1949-4ba0-afca-a588619adcf8 # ╠═06d82d8f-0a92-4536-b7d5-1ca6318df548 # ╠═4c4d530e-7e0a-4664-958d-2f84bcf9ad36 # ╟─da5661b1-2aeb-4cd7-93d9-a20b40403f03 # ╠═ce331178-a08d-4125-9e87-1f2febed4e62 # ╟─304890f9-e1ab-4e3a-833a-b6343f53d9da # ╟─f0a4724f-3a77-4dff-8f86-ad2840be6170 # ╠═5aa06357-ddd8-4967-bc21-034a4e8272e6 # ╟─01efde9a-ad6f-426e-8ee7-96806d241d7b # ╟─a0392b17-913d-4033-b739-48ee822086f5 # ╠═ec465b96-dc39-4a46-a800-0ba9b5a5110e # ╟─4386d149-297b-439b-9454-e3cd98166231 # ╠═01d4906c-47a6-40b4-ae69-cde258836d69 # ╟─d914f466-22d5-4e81-bf64-59939b44ae60 # ╟─a7987e02-9800-47c3-888d-a8c025f6a1a3 # ╠═a2f51d4d-8cbf-4f04-ac66-852cebe03faf # ╟─fdbd2f77-4dbe-4b94-a68d-2ed2bd5842d0 # ╟─00000000-0000-0000-0000-000000000001 # ╟─00000000-0000-0000-0000-000000000002