Concurreny in Golang - A Lesson Worth 12k USD from Tiktok
Here is main idea of concurency in Golang.
1. Goroutine
- It is a small virtual threads manage by Go runtime.
2. Channel
- Place where multiple goroutines communicate together.
- Use case: use to batch processing write database or make conversion API in server.
3. Basic Usage Goroutine
func hello(name string) {
for i := 0; i < 5; i++ {
fmt.Println("Hello", name)
time.Sleep(time.Millisecond * 500)
}
}
func main() {
go hello("John")
go hello("Peter")
}
Hello John
Hello Peter
Hello Peter
Hello John
Hello Peter
Hello John
Hello John
Hello Peter
Hello Peter
Hello John
4. Sample deadlock in Goroutine
package main
import (
"fmt"
"time"
)
func main() {
c := make(chan int)
go process(c)
for {
fmt.Println("Received:", <-c)
}
}
func process(c chan int) {
for i := 1; i <= 3; i++ {
c <- i
time.Sleep(time.Millisecond * 300)
}
}
Received: 1
Received: 2
Received: 3
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/tmp/sandbox708494458/prog.go:13 +0x7d
Reason:
- After goroutine send 1,2,3 to the channel.
- Main thread wait for another value from channel ⇒ Nothing send continue ⇒ Deadlock.
Solution:
-
Safe consume message
for { i, open := <-c if !open { return } fmt.Println("Received:", i) }
5. Buffered Channel and Unbuffered Channel
package main
import "fmt"
func main() {
ch := make(chan int)
ch <- 1
fmt.Println("Received:", <-ch)
}
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/tmp/sandbox980511399/prog.go:7 +0x59
Reason:
- Unbuffered channel are synchronized, **when produce message to channel **will block the current thread (main thread) ⇒ deadlock.
Solution:
Unbuffered Channel
ch := make(chan int) // unbuffered
go func() {
ch <- 1 // BLOCKS here until main receives
}()
v := <-ch // main receives, unblocks the goroutine
fmt.Println(v)
Buffered Channel
ch := make(chan int, 1) // capacity = 1
go func() {
ch <- 1 // DOES NOT block (space available)
}()
time.Sleep(time.Second) // receiver isn't even ready yet
v := <-ch
fmt.Println(v)
Example Buffered Channel still lock when reach the maximum size
ch := make(chan int, 1)
ch <- 1 // OK
ch <- 2 // BLOCKS until someone reads from the channel
| Step | Action | Channel State | Sender Status |
|---|---|---|---|
| 1 | ch := make(chan int, 1) |
empty | — |
| 2 | ch <- 1 |
[1] (full) |
— |
| 3 | ch <- 2 |
blocks (waiting) | 🟥 blocked |
| 4 | <-ch reads 1 |
buffer empty | unblocks send |
| 5 | (Automatically) | [2] now in channel |
🟢 send completes |
6. Select
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go sendAndSleep(c1, "Sleep 1s", time.Second*1)
go sendAndSleep(c2, "Sleep 5s", time.Second*5)
for {
fmt.Println("Received:", <-c1)
fmt.Println("Received:", <-c2)
}
}
func sendAndSleep(c chan string, value string, duration time.Duration) {
for {
c <- value
time.Sleep(duration)
}
}
Reason:
- It works, but channel 2 need to wait to recieve meesage from channel 1 because recieve <-c1 before ←c2.
Solution:
-
Use select to asychronus consume message from channel.
for { select { case v1 := <-c1: fmt.Println("Received:", v1) case v2 := <-c2: fmt.Println("Received:", v2) } }
7. Does Go run goroutines in parallel?
runtime.GOMAXPROCS(2)
- By default Go set GOMAXPROCS = numbers of CPUs.
8. sync package
Go (sync) |
Java Equivalent | Purpose |
|---|---|---|
sync.Mutex |
synchronized, ReentrantLock |
Mutual exclusion lock |
sync.RWMutex |
ReentrantReadWriteLock |
Read-write lock |
sync.WaitGroup |
CountDownLatch, CyclicBarrier, sometimes Join() |
Wait for goroutines to finish |
sync.Once |
initialize-once, AtomicReference, Lazy, volatile + if check |
Run initialization exactly once |
sync.Cond |
Object.wait()/notify()/notifyAll() or Condition |
Wait until a condition is met |
sync.Pool |
ThreadLocal, Object Pool (common pattern) |
Reuse objects (reduce allocations) |
sync.Map |
ConcurrentHashMap |
Thread-safe map |
context.Context (not sync but Go core) |
Future.cancel(), CompletableFuture, Thread.interrupt() |
Cancellation & timeout |
| Goroutine | Thread, Runnable, ExecutorService |
Lightweight concurrent task |
8.1. sync.Mutex
-
Used to protect shared data from concurrent access.
var mu sync.Mutex var count int func worker() { mu.Lock() count++ // safely modify shared data mu.Unlock() }
8.2. sync.RWMutex
-
Seperate read and write demand
var mu sync.RWMutex var count int func Read() int { mu.RLock() // do not lock in read-heavy defer mu.RUnlock() return count } func Write(v int) { mu.Lock() // lock in write count = v mu.Unlock() }
8.3. sync.WaitGroup
- Use to wait all API call to finish.
-
WaitGroup do not use to modify the same resource.
func Handler(w http.ResponseWriter, r *http.Request) { var wg sync.WaitGroup for _, user := range users { wg.Add(1) go func(u User) { defer wg.Done() process(u) }(user) } wg.Wait() // 👈 ensure all users processed w.Write([]byte("done")) }
8.4. Semaphore
- Using semaphore to limit number of API calls at the same time.
- Using waitGroup to wait all goroutines is finish
package main
import (
"context"
"fmt"
"runtime"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func main() {
ctx := context.Background()
maxWorkers := runtime.GOMAXPROCS(0) // e.g., number of CPU cores
sem := semaphore.NewWeighted(int64(maxWorkers))
var wg sync.WaitGroup
tasks := 10
for i := 0; i < tasks; i++ {
wg.Add(1)
// Acquire slot before launching goroutine
if err := sem.Acquire(ctx, 1); err != nil {
// if context canceled, handle error
fmt.Println("Acquire failed:", err)
wg.Done()
continue
}
go func(taskID int) {
defer wg.Done()
defer sem.Release(1)
fmt.Printf("Task %d starting\n", taskID)
time.Sleep(2 * time.Second) // simulate work
fmt.Printf("Task %d done\n", taskID)
}(i)
}
// Wait for all tasks to finish
wg.Wait()
fmt.Println("All tasks completed")
}
Task 7 starting
Task 5 starting
Task 6 starting
Task 2 starting
Task 4 starting
Task 0 starting
Task 3 starting
Task 1 starting
Task 6 done
Task 1 done
Task 0 done
Task 5 done
Task 3 done
Task 4 done
Task 2 done
Task 7 done
-----
Task 8 starting
Task 9 starting
Task 9 done
Task 8 done
8.4. errgroup
- Use to return the FIRST ERROR in multiple goroutines calls.
-
When it happened error ⇒ context cancelled directly.
package main import ( "context" "errors" "fmt" "golang.org/x/sync/errgroup" "time" ) func fetchData(id int) error { if id == 2 { return errors.New("service failed for ID=2") } time.Sleep(1 * time.Second) fmt.Println("Successfully processed ID:", id) return nil } func main() { g, ctx := errgroup.WithContext(context.Background()) ids := []int{1, 2, 3} for _, id := range ids { id := id // avoid closure capture issue g.Go(func() error { // Check if already canceled select { case <-ctx.Done(): return ctx.Err() default: return fetchData(id) } }) } if err := g.Wait(); err != nil { fmt.Println("❌ Error occurred:", err) return } fmt.Println("🎉 All requests completed successfully!") }
8.5. once
- Multiple goroutines run at once.
package main
import (
"fmt"
"sync"
)
var once sync.Once
func initConfig() {
fmt.Println("Initializing configuration...")
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(initConfig)
}()
}
wg.Wait() // Wait for all goroutines to finish
}
Initializing configuration...
8.6. sync.Pool
-
Use as a temp buffer do not initialize the new buffer.
package main import ( "bytes" "fmt" "sync" ) var bufPool = sync.Pool{ New: func() interface{} { fmt.Println("Creating new buffer") return new(bytes.Buffer) }, } func main() { // Get a buffer from pool buf := bufPool.Get().(*bytes.Buffer) buf.Reset() buf.WriteString("Hello Sync Pool") fmt.Println(buf.String()) // Put buffer back to pool bufPool.Put(buf) // Get buffer again (reused) buf2 := bufPool.Get().(*bytes.Buffer) fmt.Println("Reused buffer contains:", buf2.String()) // contains old value! buf2.Reset() // always reset before reuse }
8.7. sync.Map
-
Use as a concurrency map to block by record.
package main import ( "fmt" "sync" ) func main() { var m sync.Map // Store values m.Store("name", "An") m.Store("age", 23) // Load values if v, ok := m.Load("name"); ok { fmt.Println("Name:", v) } // Load with default value v, _ := m.LoadOrStore("country", "Vietnam") fmt.Println("Country:", v) // Iterate through map m.Range(func(k, v interface{}) bool { fmt.Println(k, v) return true }) }
| Operation | Lock behavior |
|---|---|
Read (Load, Range) |
Lock-free (most of the time) |
Write (Store, LoadOrStore, and Delete) |
Uses internal locking |
8.8. sync.Cond
- Use the same mechiasm like notify, notifyAll, wait
-
In Golang, we use Signal(), Wait(), Broadcast()
type Queue struct { items []int cond *sync.Cond size int } func (q *Queue) Enqueue(v int) { q.cond.L.Lock() defer q.cond.L.Unlock() for len(q.items) == q.size { q.cond.Wait() // wait for space } q.items = append(q.items, v) q.cond.Signal() // someone can consume } func (q *Queue) Dequeue() int { q.cond.L.Lock() defer q.cond.L.Unlock() for len(q.items) == 0 { q.cond.Wait() // wait for items } v := q.items[0] q.items = q.items[1:] q.cond.Signal() // someone can produce again return v }
November 23, 2025