Go Learning Path - Module 8: Concurrency

Go Learning Path - Module 8: Concurrency with Goroutines and Channels

go教程目录

Module 1: Hello World & Basic Concepts

Module 2: Variables, Data Types, and Constants

Module 2: Variables, Data Types, and Constants

Module 4: Control Structures (if/else, loops)

Module 5: Arrays, Slices, and Maps Arrays

Module 6: Structs and Interfaces

Module 7: Pointers and Memory Management

Module 8: Concurrency with Goroutines and Channels

Module 9: Error Handling and Defer/Panic/Recover

Module 10: Advanced Topics - Testing and Standard Library

Concurrency is one of Go's greatest strengths. Go provides lightweight threads called goroutines and channels for communication between goroutines, making concurrent programming both powerful and easy.

Goroutines

A goroutine is a lightweight thread managed by the Go runtime. To create a goroutine, simply prefix a function call with go:

  
package main
  

  
import (
  
    "fmt"
  
    "time"
  
)
  

  
// Function to run in a goroutine
  
func printNumbers() {
  
    for i := 1; i <= 5; i++ {
  
        fmt.Printf("Number: %d\n", i)
  
        time.Sleep(200 * time.Millisecond)  // Simulate work
  
    }
  
}
  

  
// Another function for a second goroutine
  
func printLetters() {
  
    for i := 'A'; i <= 'E'; i++ {
  
        fmt.Printf("Letter: %c\n", i)
  
        time.Sleep(300 * time.Millisecond)  // Simulate work
  
    }
  
}
  

  
func main() {
  
    fmt.Println("Starting goroutines...")
  
    
  
    // Start goroutines
  
    go printNumbers()
  
    go printLetters()
  
    
  
    // Give goroutines time to run
  
    time.Sleep(2 * time.Second)
  
    
  
    fmt.Println("Main function ending...")
  
}
  

Anonymous Functions as Goroutines

  
func main() {
  
    messages := []string{
  
        "Hello from goroutine 1",
  
        "Hello from goroutine 2", 
  
        "Hello from goroutine 3",
  
    }
  
    
  
    for i, msg := range messages {
  
        // Each goroutine captures different values
  
        go func(index int, message string) {
  
            fmt.Printf("Goroutine %d: %s\n", index, message)
  
        }(i, msg)  // Pass values as arguments to avoid closure issues
  
    }
  
    
  
    // Wait for all goroutines to finish
  
    time.Sleep(1 * time.Second)
  
}
  

WaitGroup for Goroutine Synchronization

Use sync.WaitGroup to wait for all goroutines to complete:

  
package main
  

  
import (
  
    "fmt"
  
    "sync"
  
    "time"
  
)
  

  
func worker(id int, wg *sync.WaitGroup) {
  
    defer wg.Done()  // Signal completion when function returns
  
    
  
    fmt.Printf("Worker %d starting\n", id)
  
    
  
    // Simulate work
  
    time.Sleep(time.Duration(id) * time.Second)
  
    
  
    fmt.Printf("Worker %d done\n", id)
  
}
  

  
func main() {
  
    var wg sync.WaitGroup
  
    
  
    // Start multiple workers
  
    for i := 1; i <= 3; i++ {
  
        wg.Add(1)  // Increment counter
  
        go worker(i, &wg)
  
    }
  
    
  
    fmt.Println("All workers started, waiting for completion...")
  
    
  
    // Wait for all goroutines to finish
  
    wg.Wait()
  
    
  
    fmt.Println("All workers completed!")
  
}
  

Channels

Channels are typed conduits through which you can send and receive values with the channel operator <-:

  
package main
  

  
import "fmt"
  

  
func main() {
  
    // Create a channel for integers
  
    ch := make(chan int)
  
    
  
    // Send value to channel (in goroutine to avoid blocking)
  
    go func() {
  
        ch <- 42  // Send 42 to the channel
  
    }()
  
    
  
    // Receive value from channel
  
    value := <-ch  // Blocks until value is available
  
    fmt.Printf("Received: %d\n", value)
  
    
  
    // Channel with capacity (buffered channel)
  
    bufferedCh := make(chan string, 2)
  
    bufferedCh <- "Hello"
  
    bufferedCh <- "World"
  
    
  
    // Can read without blocking since there's capacity
  
    fmt.Println(<-bufferedCh)  // "Hello"
  
    fmt.Println(<-bufferedCh)  // "World"
  
}
  

Directional Channels

You can specify if a channel is for sending, receiving, or both:

  
package main
  

  
import "fmt"
  

  
// Function that only sends to a channel
  
func sendOnly(ch chan<- string, msg string) {
  
    ch <- msg
  
}
  

  
// Function that only receives from a channel
  
func receiveOnly(ch <-chan string) string {
  
    return <-ch
  
}
  

  
func main() {
  
    ch := make(chan string)
  
    
  
    go sendOnly(ch, "Hello from goroutine")
  
    msg := receiveOnly(ch)
  
    
  
    fmt.Println(msg)  // "Hello from goroutine"
  
}
  

Channel Operations

Closing and Range Over Channels

  
package main
  

  
import (
  
    "fmt"
  
    "time"
  
)
  

  
func sendSequential(ch chan int) {
  
    for i := 1; i <= 5; i++ {
  
        ch <- i
  
        time.Sleep(100 * time.Millisecond)  // Simulate delay
  
    }
  
    close(ch)  // Close channel when done sending
  
}
  

  
func main() {
  
    ch := make(chan int)
  
    
  
    go sendSequential(ch)
  
    
  
    // Read all values until channel is closed
  
    for value := range ch {
  
        fmt.Printf("Received: %d\n", value)
  
    }
  
    
  
    fmt.Println("Channel closed, all values received")
  
}
  

Non-blocking Channel Operations

  
package main
  

  
import "fmt"
  

  
func main() {
  
    messages := make(chan string)
  
    signals := make(chan bool)
  
    
  
    // Non-blocking receive
  
    select {
  
    case msg := <-messages:
  
        fmt.Println("Received message:", msg)
  
    default:
  
        fmt.Println("No message received")
  
    }
  
    
  
    // Non-blocking send
  
    msg := "hi"
  
    select {
  
    case messages <- msg:
  
        fmt.Println("Sent message:", msg)
  
    default:
  
        fmt.Println("No message sent")
  
    }
  
    
  
    // Multi-way select with default
  
    select {
  
    case msg := <-messages:
  
        fmt.Println("Received message:", msg)
  
    case sig := <-signals:
  
        fmt.Println("Received signal:", sig)
  
    default:
  
        fmt.Println("No activity")
  
    }
  
}
  

Advanced Channel Patterns

Fan-in Pattern

  
package main
  

  
import (
  
    "fmt"
  
    "sync"
  
    "time"
  
)
  

  
// Producer function that sends values to a channel
  
func producer(name string, ch chan<- int, wg *sync.WaitGroup) {
  
    defer wg.Done()
  
    defer close(ch)
  
    
  
    for i := 1; i <= 3; i++ {
  
        value := i * 10
  
        fmt.Printf("%s producing: %d\n", name, value)
  
        ch <- value
  
        time.Sleep(200 * time.Millisecond)
  
    }
  
}
  

  
// Fan-in function that merges multiple channels into one
  
func fanIn(channels ...<-chan int) <-chan int {
  
    out := make(chan int)
  
    
  
    var wg sync.WaitGroup
  
    wg.Add(len(channels))
  
    
  
    // Start a goroutine for each channel
  
    for _, ch := range channels {
  
        go func(c <-chan int) {
  
            defer wg.Done()
  
            for val := range c {
  
                out <- val
  
            }
  
        }(ch)
  
    }
  
    
  
    // Close output channel when all inputs are done
  
    go func() {
  
        wg.Wait()
  
        close(out)
  
    }()
  
    
  
    return out
  
}
  

  
func main() {
  
    ch1 := make(chan int)
  
    ch2 := make(chan int)
  
    
  
    var wg sync.WaitGroup
  
    wg.Add(2)
  
    
  
    go producer("Producer A", ch1, &wg)
  
    go producer("Producer B", ch2, &wg)
  
    
  
    // Wait for producers to start
  
    time.Sleep(50 * time.Millisecond)
  
    
  
    // Merge the channels
  
    merged := fanIn(ch1, ch2)
  
    
  
    // Print all values
  
    for value := range merged {
  
        fmt.Printf("Merged value: %d\n", value)
  
    }
  
    
  
    wg.Wait()
  
    fmt.Println("All done!")
  
}
  

Timeout Pattern

  
package main
  

  
import (
  
    "fmt"
  
    "time"
  
)
  

  
func doWork(name string, ch chan<- string, duration time.Duration) {
  
    time.Sleep(duration)
  
    ch <- fmt.Sprintf("%s completed", name)
  
}
  

  
func main() {
  
    result := make(chan string)
  
    
  
    // Start work in goroutine
  
    go doWork("Important Task", result, 2*time.Second)
  
    
  
    // Wait with timeout
  
    select {
  
    case res := <-result:
  
        fmt.Println(res)
  
    case <-time.After(1 * time.Second):  // Timeout after 1 second
  
        fmt.Println("Timeout! Work took too long.")
  
    }
  
    
  
    // Now try again with longer timeout
  
    result2 := make(chan string)
  
    go doWork("Another Task", result2, 800*time.Millisecond)
  
    
  
    select {
  
    case res := <-result2:
  
        fmt.Println(res)
  
    case <-time.After(1 * time.Second):
  
        fmt.Println("Timeout! Work took too long.")
  
    }
  
}
  

Practical Examples of Concurrency

Web Request with Multiple APIs

  
package main
  

  
import (
  
    "fmt"
  
    "time"
  
)
  

  
// Simulate API calls that take different amounts of time
  
func fetchFromAPI(service string, ch chan<- string) {
  
    // Simulate network delay
  
    delay := map[string]time.Duration{
  
        "users":    300 * time.Millisecond,
  
        "products": 500 * time.Millisecond,
  
        "orders":   700 * time.Millisecond,
  
    }[service]
  
    
  
    time.Sleep(delay)
  
    ch <- fmt.Sprintf("Data from %s service", service)
  
}
  

  
func main() {
  
    start := time.Now()
  
    
  
    // Create channels for each API
  
    usersCh := make(chan string)
  
    productsCh := make(chan string)
  
    ordersCh := make(chan string)
  
    
  
    // Start all API calls concurrently
  
    go fetchFromAPI("users", usersCh)
  
    go fetchFromAPI("products", productsCh)
  
    go fetchFromAPI("orders", ordersCh)
  
    
  
    // Collect results (this will happen in the order they complete)
  
    for i := 0; i < 3; i++ {
  
        select {
  
        case users := <-usersCh:
  
            fmt.Println(users)
  
        case products := <-productsCh:
  
            fmt.Println(products)
  
        case orders := <-ordersCh:
  
            fmt.Println(orders)
  
        }
  
    }
  
    
  
    fmt.Printf("Total time: %v\n", time.Since(start))
  
}
  

Worker Pool Pattern

  
package main
  

  
import (
  
    "fmt"
  
    "sync"
  
    "time"
  
)
  

  
type Job struct {
  
    ID      int
  
    Payload string
  
}
  

  
type Result struct {
  
    JobID   int
  
    Output  string
  
    Worker  int
  
}
  

  
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
  
    defer wg.Done()
  
    
  
    for job := range jobs {
  
        // Simulate processing time
  
        time.Sleep(200 * time.Millisecond)
  
        
  
        result := Result{
  
            JobID:  job.ID,
  
            Output: fmt.Sprintf("Processed: %s by worker %d", job.Payload, id),
  
            Worker: id,
  
        }
  
        
  
        results <- result
  
    }
  
}
  

  
func main() {
  
    numJobs := 10
  
    numWorkers := 3
  
    
  
    jobs := make(chan Job, numJobs)
  
    results := make(chan Result, numJobs)
  
    
  
    var wg sync.WaitGroup
  
    
  
    // Start workers
  
    for i := 1; i <= numWorkers; i++ {
  
        wg.Add(1)
  
        go worker(i, jobs, results, &wg)
  
    }
  
    
  
    // Send jobs
  
    go func() {
  
        for i := 1; i <= numJobs; i++ {
  
            job := Job{
  
                ID:      i,
  
                Payload: fmt.Sprintf("task-%d", i),
  
            }
  
            jobs <- job
  
        }
  
        close(jobs)  // Close when done sending
  
    }()
  
    
  
    // Close results channel when all workers are done
  
    go func() {
  
        wg.Wait()
  
        close(results)
  
    }()
  
    
  
    // Collect results
  
    for result := range results {
  
        fmt.Printf("Job %d processed by worker %d: %s\n", 
  
                   result.JobID, result.Worker, result.Output)
  
    }
  
}
  

Common Concurrency Patterns

Generator Pattern

  
package main
  

  
import "fmt"
  

  
// Generator function that returns a channel
  
func intGenerator(start, end int) <-chan int {
  
    ch := make(chan int)
  
    
  
    go func() {
  
        defer close(ch)
  
        for i := start; i <= end; i++ {
  
            ch <- i
  
        }
  
    }()
  
    
  
    return ch
  
}
  

  
func main() {
  
    numbers := intGenerator(1, 5)
  
    
  
    for num := range numbers {
  
        fmt.Printf("Generated: %d\n", num)
  
    }
  
}
  

Pipeline Pattern

  
package main
  

  
import "fmt"
  

  
// Takes integers and doubles them
  
func double(in <-chan int) <-chan int {
  
    out := make(chan int)
  
    
  
    go func() {
  
        defer close(out)
  
        for val := range in {
  
            out <- val * 2
  
        }
  
    }()
  
    
  
    return out
  
}
  

  
// Takes integers and squares them
  
func square(in <-chan int) <-chan int {
  
    out := make(chan int)
  
    
  
    go func() {
  
        defer close(out)
  
        for val := range in {
  
            out <- val * val
  
        }
  
    }()
  
    
  
    return out
  
}
  

  
func main() {
  
    // Create initial values
  
    nums := make(chan int)
  
    go func() {
  
        defer close(nums)
  
        for i := 1; i <= 5; i++ {
  
            nums <- i
  
        }
  
    }()
  
    
  
    // Create pipeline: nums -> double -> square
  
    doubled := double(nums)
  
    squared := square(doubled)
  
    
  
    // Collect results
  
    for result := range squared {
  
        fmt.Printf("Result: %d\n", result)
  
        // This gives us: (1*2)^2=4, (2*2)^2=16, (3*2)^2=36, etc.
  
    }
  
}
  

Best Practices for Concurrency

  1. Use goroutines for independent tasks that can run concurrently

  2. Use channels for communication between goroutines instead of shared memory

  3. Always handle channel closing to prevent goroutine leaks

  4. Use WaitGroup when you need to wait for multiple goroutines to finish

  5. Be careful with shared state - use mutexes when necessary

  6. Avoid race conditions by properly synchronizing access to shared data

Exercises

  1. Write a program that calculates Fibonacci numbers concurrently using goroutines and channels.

  2. Create a web scraper that fetches multiple URLs concurrently and collects the results.

  3. Implement a producer-consumer pattern with multiple producers and consumers.

  4. Write a program that implements a simple concurrent counter with multiple goroutines incrementing it.

  5. Create a pipeline that processes text data: split into words → count word lengths → sum total characters.


Next: Module 9: Error Handling and Defer/Panic/Recover