Fan in pattern: is a concurrency paradigm where inputs from several sources get converged (multiplexed) into a single stream. In simple words , this paradigm can be thought of as producer and consumer architecture |
|
In this example there is 2 ways to merge the channel 1) with sync.WaitGroup 2) with atomic.Add |
|
Ref |
|
package main |
|
import ( "encoding/csv" "fmt" "io" "os" "path/filepath" "sync" "sync/atomic" ) |
|
Read 2 CSV files concurrently. |
func main() { |
merge 2 channels |
p1, _ := filepath.Abs("./assets/file1.csv") ch1, err := read(p1) if err != nil { panic(fmt.Errorf("could not read file %v", err)) } p2, _ := filepath.Abs("./assets/file2.csv") ch2, err := read(p2) if err != nil { panic(fmt.Errorf("could not read file %v", err)) } exit := make(chan struct{}) |
chM := mergeAtomic(ch1, ch2) |
chM := mergeWait(ch1, ch2) println("[LOG] receive the merged channel.") |
go func() { for l := range chM { fmt.Printf("[CSV Result] %s \n", l) } close(exit) }() |
|
println("[LOG] Wait for the exit signal.") <-exit fmt.Println("All completed, Exit.") } |
|
Opt 1) merge channels with sync.WaitGroup |
func mergeWait(cs ...<-chan []string) <-chan []string { out := make(chan []string) var wg sync.WaitGroup |
1) wait for all channels |
wg.Add(len(cs)) |
2) mark done after receiving all values from source chan |
send := func(ch <-chan []string) { defer wg.Done() for v := range ch { out <- v } } |
for _, ch := range cs { go send(ch) } |
|
XXX: fatal error: all goroutines are asleep - deadlock! we need to return out channel first otherwise this function will block the main thread [4] |
//// wg.Wait() //// close(out) |
3) wait and close channel |
go func() { wg.Wait() |
close channel to avoid deadlock |
close(out) println("[LOG] merged channel closed.") }() |
return out // 1. return out first } |
|
option 2) merge channels with atomic counter |
func mergeAtomic(cs ...<-chan []string) <-chan []string { out := make(chan []string) var counter int32 atomic.StoreInt32(&counter, int32(len(cs))) // works like wait group send := func(c <-chan []string) { for v := range c { out <- v } if atomic.AddInt32(&counter, -1) == 0 { close(out) } } for _, c := range cs { go send(c) } return out } |
utils: read file and return a channel of file lines |
func read(relPath string) (chan []string, error) { absPath, _ := filepath.Abs(relPath) f, err := os.Open(absPath) if err != nil { return nil, fmt.Errorf("opening file %v", err) } ch := make(chan []string) cr := csv.NewReader(f) |
go func() { for { record, err := cr.Read() if err == io.EOF { close(ch) return } ch <- record } }() return ch, nil } |
$ go run pattern-fan-in.go
|
|
[LOG] receive the merged channel. [LOG] Wait for the exit signal. [CSV Result] [file2 make large research] [CSV Result] [file2 stand skill solve] [CSV Result] [file1 rhythm proper secret] [CSV Result] [file1 scene screen bright] [CSV Result] [file2 you sometime worth] [CSV Result] [file2 end uncle long] [CSV Result] [file2 ate adjective protection] [CSV Result] [file2 nothing there bright] [CSV Result] [file1 universe official primitive] [CSV Result] [file1 might worried acres] [CSV Result] [file1 flow slope eye] [CSV Result] [file1 across again threw] [CSV Result] [file1 rich won women] [LOG] merged channel closed. All completed, Exit. |