Reinvert control with delimited continuations
But given a language with delimited continuation operators, it turns out that you can reinvert control. This article uses the
reset operators in racket to demonstrate a technique possible in languages such as scheme, haskell and scala.
Event loops and asynchronous operations
#lang racket (provide async-op event-loop set-timeout) (struct async-operation (compute finish)) (define pending (make-channel)) (define finished (make-channel)) (define (pending-loop) (match-let (((async-operation compute finish) (channel-get pending))) (thread (lambda () (let ((result (compute))) (channel-put finished (lambda () (finish result))))))) (pending-loop)) (void (thread pending-loop)) (define (event-loop) (let loop () ((channel-get finished)) (loop))) (define (pending-add aop) (channel-put pending aop)) (define (async-op args succeed fail) (pending-add (async-operation (lambda () (displayln (format "async operation started with: ~v" args)) (sleep 2) (displayln "async operation finished") (random 2)) (lambda (result) (if (= 0 result) (succeed) (fail)))))) (define (set-timeout latency callback) (pending-add (async-operation (lambda () (displayln (format "sleeping for ~a" latency)) (sleep latency)) (lambda (_) (callback)))))
This module simulates an event loop by spawning worker threads to process asynchronous operations.
Invoke asynchronous ops while passing callbacks
#lang racket (require "event-loop.rkt") (define (with-callbacks) (displayln "perform an async operation") (async-op (list 'arg1 'arg2) (lambda () (displayln "handle success and perform another operation") (async-op (list 'arg3 'arg4) (lambda () (displayln "handle success again")) (lambda () (displayln "handle failure of second operation")))) (lambda () (displayln "handle failure of first operation")))) (displayln "with-callbacks") (with-callbacks) (event-loop)
To invoke asynchronous operations, we pass two first-class functions representing how to proceed when the operation either succeeds or fails. The callbacks can be seen as manually-lifted first-class continuations. Taken to the extreme, it can be more difficult to read/write programs in such a style.
#lang racket (provide (all-from-out "event-loop.rkt") async-op-direct set-timeout-direct) (require "event-loop.rkt" racket/control) (define (async-op-direct . args) (shift k (async-op args (lambda () (k #t)) (lambda () (k #f))))) (define (set-timeout-direct latency) (shift k (set-timeout latency (lambda () (k (void))))))
We can use the
shift operator to grab the continuation to which we want to pass the result of an asynchronous operation when invoked in direct-style. Asynchronous operations transformed in this way can be provided by libraries without any cooperation from the event-loop implementation, demonstrating the technique's general applicability.
#lang racket (require "event-loop-direct.rkt" racket/control) (define (direct) (reset (displayln "perform an async operation") (if (async-op-direct 'arg1 'arg2) (begin (displayln "handle success and perform another operation") (if (async-op-direct 'arg3 'arg4) (displayln "handle success again") (displayln "handle failure of second operation"))) (displayln "handle failure of first operation")))) (displayln "direct") (direct) (event-loop)
Finally, we use a
reset block to indicate the extent of sequentially-executed code associated with a series of
#lang racket (require "event-loop-direct.rkt" racket/control) (define-syntax-rule (wait-until body ...) (let loop () (set-timeout-direct 1) (displayln "are we there yet?") (if (begin body ...) (displayln "we're there!") (begin (displayln "no, not there yet") (loop))))) (define (direct-concurrency) (define count 10) (define results (box '())) (define (add-result result) (set-box! results (cons result (unbox results)))) (reset (displayln "perform async operations concurrently") (for ((index (range count))) (reset (add-result (async-op-direct index)))) (wait-until (= count (length (unbox results)))) (displayln results))) (displayln "direct-concurrency") (direct-concurrency) (event-loop)
Invocations can be performed concurrently by wrapping them in separate
reset blocks. These blocks may be embedded in other blocks to control the timing of invocations, satisfying dependencies on earlier asynchronous results.