~ / track B / clojure advanced

core.async channels

Advanced

core.async brings Communicating Sequential Processes (CSP) to Clojure. Concurrency becomes a story about channels — typed conduits that processes use to hand values to each other — rather than about threads, locks, or callbacks. Two key macros, go and <!/>!, let you write sequential-looking code that the compiler rewrites into a state machine.

Note: in the SCI REPL below core.async may not be available. The examples are written as you'd run them in a real Clojure project.

Minimal example: a channel

A channel is created with chan. Producers >! a value onto it; consumers <! a value off of it. Both block their go block, not their OS thread.

(require '[clojure.core.async :as a :refer [go chan <! >! <!! >!!]])

(let [c (chan)]
  (go (>! c :hello))
  (<!! c))   ;; => :hello

<!! is the blocking take — usable from outside a go block, e.g. at the REPL or in the main thread. Inside a go, always use the parking versions <! and >!.

Pipelines, not callbacks

Channels compose. A producer go, a transformer go, and a consumer go form a pipeline where each piece is sequential code:

(let [in  (chan)
      out (chan)]
  (go (doseq [x (range 5)] (>! in x)) (a/close! in))
  (go (loop []
        (when-let [x (<! in)]
          (>! out (* x x))
          (recur)))
      (a/close! out))
  (<!! (a/into [] out)))
;; => [0 1 4 9 16]

Three sequential blocks, three responsibilities, no callbacks — and each go suspends instead of blocking a thread, so you can have thousands of them.

Buffered channels and backpressure

chan takes an optional buffer that decouples producer and consumer up to a point:

(chan 10)                  ;; bounded buffer of size 10
(chan (a/sliding-buffer 3)) ;; keep newest 3, drop older on overflow
(chan (a/dropping-buffer 3));; keep oldest 3, drop new on overflow

An unbuffered channel forces a rendezvous: the producer doesn't proceed until the consumer takes. That's natural backpressure — slow consumers throttle fast producers automatically.

Transducers on channels

A channel can carry a transducer, applied as values flow through. Same recipe you'd use with into or transduce works here:

(def xf (comp (filter even?) (map inc)))
(let [c (chan 10 xf)]
  (a/onto-chan!! c (range 10))
  (<!! (a/into [] c)))
;; => [1 3 5 7 9]

alt!/alt!! — wait on multiple channels

When you need to take from whichever channel is ready first (or send to whichever is ready), alt! is the tool. It's how you build timeouts, cancellation, and fan-in:

(let [c       (chan)
      timeout (a/timeout 1000)]
  (go (Thread/sleep 500) (>! c :done))
  (a/alt!! c       ([v] {:result v})
           timeout ([_] {:result :timed-out})))

Check yourself

? quiz

What does `(<! c)` do inside a `go` block when no value is available on channel `c`?

Exercise

Sketch a tiny producer/consumer pipeline that:

  1. produces the integers 0..9 onto a channel,
  2. squares them with a transducer-bearing channel,
  3. sums the squares on the consumer side.

Expected result: 285.

(let [in  (chan)
      out (chan 10 (map (fn [x] (* x x))))]
  ;; producer
  ;; transformer: read from in, write to out
  ;; consumer: reduce + over out
  )
 status: new