~ / track B / clojure advanced
core.async channels
Advancedcore.async brings Communicating Sequential Processes (CSP) to Clojure.
Concurrency becomes a story about channels — typed conduits that
processes use to hand values to each other — rather than about threads,
locks, or callbacks. Two key macros, go and <!/>!, let you write
sequential-looking code that the compiler rewrites into a state machine.
Note: in the SCI REPL below
core.asyncmay not be available. The examples are written as you'd run them in a real Clojure project.
Minimal example: a channel
A channel is created with chan. Producers >! a value onto it; consumers
<! a value off of it. Both block their go block, not their OS thread.
(require '[clojure.core.async :as a :refer [go chan <! >! <!! >!!]])
(let [c (chan)]
(go (>! c :hello))
(<!! c)) ;; => :hello
<!! is the blocking take — usable from outside a go block, e.g. at the
REPL or in the main thread. Inside a go, always use the parking
versions <! and >!.
Pipelines, not callbacks
Channels compose. A producer go, a transformer go, and a consumer go form a pipeline where each piece is sequential code:
(let [in (chan)
out (chan)]
(go (doseq [x (range 5)] (>! in x)) (a/close! in))
(go (loop []
(when-let [x (<! in)]
(>! out (* x x))
(recur)))
(a/close! out))
(<!! (a/into [] out)))
;; => [0 1 4 9 16]
Three sequential blocks, three responsibilities, no callbacks — and each go suspends instead of blocking a thread, so you can have thousands of them.
Buffered channels and backpressure
chan takes an optional buffer that decouples producer and consumer up to
a point:
(chan 10) ;; bounded buffer of size 10
(chan (a/sliding-buffer 3)) ;; keep newest 3, drop older on overflow
(chan (a/dropping-buffer 3));; keep oldest 3, drop new on overflow
An unbuffered channel forces a rendezvous: the producer doesn't proceed until the consumer takes. That's natural backpressure — slow consumers throttle fast producers automatically.
Transducers on channels
A channel can carry a transducer, applied as values flow through. Same recipe
you'd use with into or transduce works here:
(def xf (comp (filter even?) (map inc)))
(let [c (chan 10 xf)]
(a/onto-chan!! c (range 10))
(<!! (a/into [] c)))
;; => [1 3 5 7 9]
alt!/alt!! — wait on multiple channels
When you need to take from whichever channel is ready first (or send to
whichever is ready), alt! is the tool. It's how you build timeouts,
cancellation, and fan-in:
(let [c (chan)
timeout (a/timeout 1000)]
(go (Thread/sleep 500) (>! c :done))
(a/alt!! c ([v] {:result v})
timeout ([_] {:result :timed-out})))
Check yourself
? quiz
What does `(<! c)` do inside a `go` block when no value is available on channel `c`?
Exercise
Sketch a tiny producer/consumer pipeline that:
- produces the integers 0..9 onto a channel,
- squares them with a transducer-bearing channel,
- sums the squares on the consumer side.
Expected result: 285.
(let [in (chan)
out (chan 10 (map (fn [x] (* x x))))]
;; producer
;; transformer: read from in, write to out
;; consumer: reduce + over out
)