;; reducing function signature
whatever, input -> whatever
Transducers are composable algorithmic transformations. They are independent from the context of their input and output sources and specify only the essence of the transformation in terms of an individual element. Because transducers are decoupled from input or output sources, they can be used in many different processes - collections, streams, channels, observables, etc. Transducers compose directly, without awareness of input or creation of intermediate aggregates.
Also see the introductory blog post, this video, and this section of the FAQ about good use cases for transducers.
A reducing function is the kind of function you’d pass to reduce - it is a function that takes an accumulated result and a new input and returns a new accumulated result:
;; reducing function signature
whatever, input -> whatever
A transducer (sometimes referred to as xform or xf) is a transformation from one reducing function to another:
;; transducer signature
(whatever, input -> whatever) -> (whatever, input -> whatever)
Most sequence functions included in Clojure have an arity that produces a transducer. This arity omits the input collection; the inputs will be supplied by the process applying the transducer. Note: this reduced arity is not currying or partial application.
For example:
(filter odd?) ;; returns a transducer that filters odd
(map inc) ;; returns a mapping transducer for incrementing
(take 5) ;; returns a transducer that will take the first 5 values
Transducers compose with ordinary function composition. A transducer performs its operation before deciding whether and how many times to call the transducer it wraps. The recommended way to compose transducers is with the existing comp function:
(def xf
(comp
(filter odd?)
(map inc)
(take 5)))
The transducer xf is a transformation stack that will be applied by a process to a series of input elements. Each function in the stack is performed before the operation it wraps. Composition of the transformer runs right-to-left but builds a transformation stack that runs left-to-right (filtering happens before mapping in this example).
As a mnemonic, remember that the ordering of transducer functions in comp is the same order as sequence transformations in ->>. The transformation above is equivalent to the sequence transformation:
(->> coll
(filter odd?)
(map inc)
(take 5))
The following functions produce a transducer when the input collection is omitted: map cat mapcat filter remove take take-while take-nth drop drop-while replace partition-by partition-all keep keep-indexed map-indexed distinct interpose dedupe random-sample
One of the most common ways to apply transducers is with the transduce function, which is analogous to the standard reduce function:
(transduce xform f coll)
(transduce xform f init coll)
transduce will immediately (not lazily) reduce over coll with the transducer xform applied to the reducing function f, using init as the initial value if supplied or (f) otherwise. f supplies the knowledge of how to accumulate the result, which occurs in the (potentially stateful) context of the reduce.
(def xf (comp (filter odd?) (map inc)))
(transduce xf + (range 5))
;; => 6
(transduce xf + 100 (range 5))
;; => 106
The composed xf transducer will be invoked left-to-right with a final call to the reducing function f. In the last example, input values will be filtered, then incremented, and finally summed.
To capture the process of applying a transducer to a coll, use the eduction function. It takes any number of xforms and a final coll and returns a reducible/iterable application of the transducer to the items in coll. These applications will be performed each time reduce/iterator is called.
(def iter (eduction xf (range 5)))
(reduce + 0 iter)
;; => 6
To apply a transducer to an input collection and construct a new output collection, use into (which efficiently uses reduce and transients if possible):
(into [] xf (range 1000))
To create a sequence from the application of a transducer to an input collection, use sequence:
(sequence xf (range 1000))
The resulting sequence elements are incrementally computed. These sequences will consume input incrementally as needed and fully realize intermediate operations. This behavior differs from the equivalent operations on lazy sequences.
Transducers have the following shape (custom code in "…"):
(fn [rf]
(fn ([] ...)
([result] ...)
([result input] ...)))
Many of the core sequence functions (like map, filter, etc) take operation-specific arguments (a predicate, function, count, etc) and return a transducer of this shape closing over those arguments. In some cases, like cat, the core function is a transducer function and does not take an rf.
The inner function is defined with 3 arities used for different purposes:
Init (arity 0) - should call the init arity on the nested transform rf, which will eventually call out to the transducing process.
Step (arity 2) - this is a standard reduction function but it is expected to call the rf step arity 0 or more times as appropriate in the transducer. For example, filter will choose (based on the predicate) whether to call rf or not. map will always call it exactly once. cat may call it many times depending on the inputs.
Completion (arity 1) - some processes will not end, but for those that do (like transduce), the completion arity is used to produce a final value and/or flush state. This arity must call the rf completion arity exactly once.
An example use of completion is partition-all, which must flush any remaining elements at the end of the input. The completing function can be used to convert a reducing function to a transducing function by adding a default completion arity.
Clojure has a mechanism for specifying early termination of a reduce:
A process that uses transducers must check for and stop when the step function returns a reduced value (more on that in Creating Transducible Processes). Additionally, a transducer step function that uses a nested reduce must check for and convey reduced values when they are encountered. (See the implementation of cat for an example.)
Some transducers (such as take, partition-all, etc) require state during the reduction process. This state is created each time the transducible process applies the transducer. For example, consider the dedupe transducer that collapses a series of duplicate values into a single value. This transducer must remember the previous value to determine whether the current value should be passed on:
(defn dedupe []
(fn [xf]
(let [prev (volatile! ::none)]
(fn
([] (xf))
([result] (xf result))
([result input]
(let [prior @prev]
(vreset! prev input)
(if (= prior input)
result
(xf result input))))))))
In dedupe, prev is a stateful container that stores the previous value during the reduction. The prev value is a volatile for performance, but it could also be an atom. The prev value will not be initialized until the transducing process starts (in a call to transduce for example). The stateful interactions are therefore contained within the context of the transducible process.
In the completion step, a transducer with reduction state should flush state prior to calling the nested transformer’s completion function, unless it has previously seen a reduced value from the nested step in which case pending state should be discarded.
Transducers are designed to be used in many kinds of processes. A transducible process is defined as a succession of steps where each step ingests an input. The source of the inputs is specific to each process (from a collection, an iterator, a stream, etc). Similarly, the process must choose what to do with the outputs produced by each step.
If you have a new context for applying transducers, there are a few general rules to be aware of:
If a step function returns a reduced value, the transducible process must not supply any more inputs to the step function. The reduced value must be unwrapped with deref before completion.
A completing process must call the completion operation on the final accumulated value exactly once.
A transducing process must encapsulate references to the function returned by invoking a transducer - these may be stateful and unsafe for use across threads.