In common with most modern programming languages, Raku is designed to support parallelism, asynchronicity and concurrency. Parallelism is about doing multiple things at once. Asynchronous programming, which is sometimes called event driven or reactive programming, is about supporting changes in the program flow caused by events triggered elsewhere in the program. Finally, concurrency is about the coordination of access and modification of some shared resources.
The aim of the Raku concurrency design is to provide a high-level, composable and consistent interface, regardless of how a virtual machine may implement it for a particular operating system, through layers of facilities as described below.
Additionally, certain Raku features may implicitly operate in an asynchronous fashion, so in order to ensure predictable interoperation with these features, user code should, where possible, avoid the lower level concurrency APIs (e.g., Thread
and Scheduler
) and use the higher-level interfaces.
High-level APIs§
Promises§
A Promise
(also called future in other programming environments) encapsulates the result of a computation that may not have completed or even started at the time the promise is obtained. A Promise
starts from a Planned
status and can result in either a Kept
status, meaning the promise has been successfully completed, or a Broken
status meaning that the promise has failed. Usually this is much of the functionality that user code needs to operate in a concurrent or asynchronous manner.
my $p1 = Promise.new; say $p1.status; # OUTPUT: «Planned» $p1.keep('Result'); say $p1.status; # OUTPUT: «Kept» say $p1.result; # OUTPUT: «Result» # (since it has been kept, a result is available!) my $p2 = Promise.new; $p2.break('oh no'); say $p2.status; # OUTPUT: «Broken» say $p2.result; # dies, because the promise has been broken CATCH { default { say .^name, ': ', .Str } }; # OUTPUT: «X::AdHoc+{X::Promise::Broken}: oh no»
Promises gain much of their power by being composable, for example by chaining, usually by the then method:
my $promise1 = Promise.new(); my $promise2 = $promise1.then( -> $v { say $v.result; "Second Result" } ); $promise1.keep("First Result"); say $promise2.result; # OUTPUT: «First ResultSecond Result»
Here the then method schedules code to be executed when the first Promise
is kept or broken, itself returning a new Promise
which will be kept with the result of the code when it is executed (or broken if the code fails). keep
changes the status of the promise to Kept
setting the result to the positional argument. result
blocks the current thread of execution until the promise is kept or broken, if it was kept then it will return the result (that is the value passed to keep
), otherwise it will throw an exception based on the value passed to break
. The latter behavior is illustrated with:
my $promise1 = Promise.new(); my $promise2 = $promise1.then(-> $v { say "Handled but : "; say $v.result}); $promise1.break("First Result"); try $promise2.result; say $promise2.cause; # OUTPUT: «Handled but : First Result»
Here the break
will cause the code block of the then
to throw an exception when it calls the result
method on the original promise that was passed as an argument, which will subsequently cause the second promise to be broken, raising an exception in turn when its result is taken. The actual Exception
object will then be available from cause
. If the promise had not been broken cause
would raise an X::Promise::CauseOnlyValidOnBroken
exception.
A Promise
can also be scheduled to be automatically kept at a future time:
my $promise1 = Promise.in(5); my $promise2 = $promise1.then(-> $v { say $v.status; 'Second Result' }); say $promise2.result;
The method in creates a new promise and schedules a new task to call keep
on it no earlier than the supplied number of seconds, returning the new Promise
object.
A very frequent use of promises is to run a piece of code, and keep the promise once it returns successfully, or break it when the code dies. The start method provides a shortcut for that:
my $promise = Promise.start( { my $i = 0; for 1 .. 10 { $i += $_ }; $i} ); say $promise.result; # OUTPUT: «55»
Here the result
of the promise returned is the value returned from the code. Similarly if the code fails (and the promise is thus broken), then cause
will be the Exception
object that was thrown:
my $promise = Promise.start({ die "Broken Promise" }); try $promise.result; say $promise.cause;
This is considered to be such a commonly required pattern that it is also provided as a keyword:
my $promise = start { my $i = 0; for 1 .. 10 { $i += $_ } $i } my $result = await $promise; say $result;
The subroutine await is almost equivalent to calling result
on the promise object returned by start
but it will also take a list of promises and return the result of each:
my $p1 = start { my $i = 0; for 1 .. 10 { $i += $_ } $i }; my $p2 = start { my $i = 0; for 1 .. 10 { $i -= $_ } $i }; my @result = await $p1, $p2; say @result; # OUTPUT: «[55 -55]»
In addition to await
, two class methods combine several Promise
objects into a new promise: allof
returns a promise that is kept when all the original promises are kept or broken:
my $promise = Promise.allof( Promise.in(2), Promise.in(3) ); await $promise; say "All done"; # Should be not much more than three seconds later
And anyof
returns a new promise that will be kept when any of the original promises is kept or broken:
my $promise = Promise.anyof( Promise.in(3), Promise.in(8600) ); await $promise; say "All done"; # Should be about 3 seconds later
Unlike await
however the results of the original kept promises are not available without referring to the original, so these are more useful when the completion or otherwise of the tasks is more important to the consumer than the actual results, or when the results have been collected by other means. You may, for example, want to create a dependent Promise that will examine each of the original promises:
my @promises; for 1..5 -> $t { push @promises, start { sleep $t; Bool.pick; }; } say await Promise.allof(@promises).then({ so all(@promises>>.result) });
Which will give True if all of the promises were kept with True, False otherwise.
If you are creating a promise that you intend to keep or break yourself then you probably don't want any code that might receive the promise to inadvertently (or otherwise) keep or break the promise before you do. For this purpose there is the method vow, which returns a Vow object which becomes the only mechanism by which the promise can be kept or broken. If an attempt to keep or break the Promise is made directly then the exception X::Promise::Vowed
will be thrown, as long as the vow object is kept private, the status of the promise is safe:
sub get_promise { my $promise = Promise.new; my $vow = $promise.vow; Promise.in(10).then({$vow.keep}); $promise; } my $promise = get_promise(); # Will throw an exception # "Access denied to keep/break this Promise; already vowed" $promise.keep; CATCH { default { say .^name, ': ', .Str } }; # OUTPUT: «X::Promise::Vowed: Access denied to keep/break this Promise; already vowed»
The methods that return a promise that will be kept or broken automatically such as in
or start
will do this, so it is not necessary to do it for these.
Supplies§
A Supply
is an asynchronous data streaming mechanism that can be consumed by one or more consumers simultaneously in a manner similar to "events" in other programming languages and can be seen as enabling event driven or reactive designs.
At its simplest, a Supply
is a message stream that can have multiple subscribers created with the method tap
on to which data items can be placed with emit
.
The Supply
can either be live
or on-demand
. A live
supply is like a TV broadcast: those who tune in don't get previously emitted values. An on-demand
broadcast is like Netflix: everyone who starts streaming a movie (taps a supply), always starts it from the beginning (gets all the values), regardless of how many people are watching it right now. Note that no history is kept for on-demand
supplies, instead, the supply
block is run for each tap of the supply.
A live
Supply
is created by the Supplier
factory, each emitted value is passed to all the active tappers as they are added:
my $supplier = Supplier.new; my $supply = $supplier.Supply; $supply.tap( -> $v { say $v }); for 1 .. 10 { $supplier.emit($_); }
Note that the tap
is called on a Supply
object created by the Supplier
and new values are emitted on the Supplier
.
An on-demand
Supply
is created by the supply
keyword:
my $supply = supply { for 1 .. 10 { emit($_); } } $supply.tap( -> $v { say $v });
In this case the code in the supply block is executed every time the Supply
returned by supply
is tapped, as demonstrated by:
my $supply = supply { for 1 .. 10 { emit($_); } } $supply.tap( -> $v { say "First : $v" }); $supply.tap( -> $v { say "Second : $v" });
The tap
method returns a Tap
object which can be used to obtain information about the tap and also to turn it off when we are no longer interested in the events:
my $supplier = Supplier.new; my $supply = $supplier.Supply; my $tap = $supply.tap( -> $v { say $v }); $supplier.emit("OK"); $tap.close; $supplier.emit("Won't trigger the tap");
Calling done
on the supply object calls the done
callback that may be specified for any taps, but does not prevent any further events being emitted to the stream, or taps receiving them.
The method interval
returns a new on-demand
supply which periodically emits a new event at the specified interval. The data that is emitted is an integer starting at 0 that is incremented for each event. The following code outputs 0 .. 5 :
my $supply = Supply.interval(2); $supply.tap(-> $v { say $v }); sleep 10;
A second argument can be supplied to interval
which specifies a delay in seconds before the first event is fired. Each tap of a supply created by interval
has its own sequence starting from 0, as illustrated by the following:
my $supply = Supply.interval(2); $supply.tap(-> $v { say "First $v" }); sleep 6; $supply.tap(-> $v { say "Second $v"}); sleep 10;
A live Supply
that keeps values until first tapped can be created with Supplier::Preserving
.
whenever
§
The whenever
keyword can be used in supply blocks or in react blocks. From the 6.d version, it needs to be used within the lexical scope of them. It introduces a block of code that will be run when prompted by an asynchronous event that it specifies - that could be a Supply
, a Channel
, a Promise
or an Iterable
.
Please note that one should keep the code inside the whenever
as small as possible, as only one whenever
block will be executed at any time. One can use a start
block inside the whenever
block to run longer running code.
In this example we are watching two supplies.
my $bread-supplier = Supplier.new; my $vegetable-supplier = Supplier.new; my $supply = supply { whenever $bread-supplier.Supply { emit("We've got bread: " ~ $_); }; whenever $vegetable-supplier.Supply { emit("We've got a vegetable: " ~ $_); }; } $supply.tap( -> $v { say "$v" }); $vegetable-supplier.emit("Radish"); # OUTPUT: «We've got a vegetable: Radish» $bread-supplier.emit("Thick sliced"); # OUTPUT: «We've got bread: Thick sliced» $vegetable-supplier.emit("Lettuce"); # OUTPUT: «We've got a vegetable: Lettuce»
react
§
The react
keyword introduces a block of code containing one or more whenever
keywords to watch asynchronous events. The main difference between a supply block and a react block is that the code in a react block runs where it appears in the code flow, whereas a supply block has to be tapped before it does anything.
Another difference is that a supply block can be used without the whenever
keyword, but a react block requires at least one whenever
to be of any real use.
react { whenever Supply.interval(2) -> $v { say $v; done() if $v == 4; } }
Here the whenever
keyword uses .act
to create a tap on the Supply
from the provided block. The react
block is exited when done()
is called in one of the taps. Using last
to exit the block would produce an error indicating that it's not really a loop construct.
An on-demand
Supply
can also be created from a list of values that will be emitted in turn, thus the first on-demand
example could be written as:
react { whenever Supply.from-list(1..10) -> $v { say $v; } }
Transforming supplies§
An existing supply object can be filtered or transformed, using the methods grep
and map
respectively, to create a new supply in a manner like the similarly named list methods: grep
returns a supply such that only those events emitted on the source stream for which the grep
condition is true is emitted on the second supply:
my $supplier = Supplier.new; my $supply = $supplier.Supply; $supply.tap(-> $v { say "Original : $v" }); my $odd_supply = $supply.grep({ $_ % 2 }); $odd_supply.tap(-> $v { say "Odd : $v" }); my $even_supply = $supply.grep({ not $_ % 2 }); $even_supply.tap(-> $v { say "Even : $v" }); for 0 .. 10 { $supplier.emit($_); }
map
returns a new supply such that for each item emitted to the original supply a new item which is the result of the expression passed to the map
is emitted:
my $supplier = Supplier.new; my $supply = $supplier.Supply; $supply.tap(-> $v { say "Original : $v" }); my $half_supply = $supply.map({ $_ / 2 }); $half_supply.tap(-> $v { say "Half : $v" }); for 0 .. 10 { $supplier.emit($_); }
Ending a supply§
If you need to have an action that runs when the supply finishes, you can do so by setting the done
and quit
options in the call to tap
:
$supply.tap: { ... }, done => { say 'Job is done.' }, quit => { when X::MyApp::Error { say "App Error: ", $_.message } };
The quit
block works very similar to a CATCH
. If the exception is marked as seen by a when
or default
block, the exception is caught and handled. Otherwise, the exception continues to up the call tree (i.e., the same behavior as when quit
is not set).
Phasers in a supply or react block§
If you are using the react
or supply
block syntax with whenever
, you can add phasers within your whenever
blocks to handle the done
and quit
messages from the tapped supply:
react { whenever $supply { ...; # your usual supply tap code here LAST { say 'Job is done.' } QUIT { when X::MyApp::Error { say "App Error: ", $_.message } } } }
The behavior here is the same as setting done
and quit
on tap
.
Channels§
A Channel
is a thread-safe queue that can have multiple readers and writers that could be considered to be similar in operation to a "fifo" or named pipe except it does not enable inter-process communication. It should be noted that, being a true queue, each value sent to the Channel
will only be available to a single reader on a first read, first served basis: if you want multiple readers to be able to receive every item sent you probably want to consider a Supply
.
An item is queued onto the Channel
with the method send, and the method receive removes an item from the queue and returns it, blocking until a new item is sent if the queue is empty:
my $channel = Channel.new; $channel.send('Channel One'); say $channel.receive; # OUTPUT: «Channel One»
If the channel has been closed with the method close then any send
will cause the exception X::Channel::SendOnClosed
to be thrown, and a receive
will throw an X::Channel::ReceiveOnClosed
.
The method list returns all the items on the Channel
and will block until further items are queued unless the channel is closed:
my $channel = Channel.new; await (^10).map: -> $r { start { sleep $r; $channel.send($r); } } $channel.close; for $channel.list -> $r { say $r; }
There is also the non-blocking method poll that returns an available item from the channel or Nil
if there is no item or the channel is closed. This does mean that the channel must be checked to determine whether it is closed:
my $c = Channel.new; # Start three Promises that sleep for 1..3 seconds, and then # send a value to our Channel ^3 .map: -> $v { start { sleep 3 - $v; $c.send: "$v from thread {$*THREAD.id}"; } } # Wait 3 seconds before closing the channel Promise.in(3).then: { $c.close } # Continuously loop and poll the channel, until it's closed my $is-closed = $c.closed; loop { if $c.poll -> $item { say "$item received after {now - INIT now} seconds"; } elsif $is-closed { last; } say 'Doing some unrelated things...'; sleep .6; } # Doing some unrelated things... # Doing some unrelated things... # 2 from thread 5 received after 1.2063182 seconds # Doing some unrelated things... # Doing some unrelated things... # 1 from thread 4 received after 2.41117376 seconds # Doing some unrelated things... # 0 from thread 3 received after 3.01364461 seconds # Doing some unrelated things...
The method closed returns a Promise
that will be kept (and consequently will evaluate to True in a Boolean context) when the channel is closed.
The .poll
method can be used in combination with .receive
method, as a caching mechanism where lack of value returned by .poll
is a signal that more values need to be fetched and loaded into the channel:
sub get-value { return $c.poll // do { start replenish-cache; $c.receive }; } sub replenish-cache { for ^20 { $c.send: $_ for slowly-fetch-a-thing(); } }
Channels can be used in place of the Supply
in the whenever
of a react
block described earlier:
my $channel = Channel.new; my $p = start { react { whenever $channel { say $_; } } } await (^10).map: -> $r { start { sleep $r; $channel.send($r); } } $channel.close; await $p;
It is also possible to obtain a Channel
from a Supply
using the Channel method which returns a Channel
which is fed by a tap
on the Supply
:
my $supplier = Supplier.new; my $supply = $supplier.Supply; my $channel = $supply.Channel; my $p = start { react { whenever $channel -> $item { say "via Channel: $item"; } } } await (^10).map: -> $r { start { sleep $r; $supplier.emit($r); } } $supplier.done; await $p;
Channel
will return a different Channel
fed with the same data each time it is called. This could be used, for instance, to fan-out a Supply
to one or more Channel
s to provide for different interfaces in a program.
Proc::Async§
Proc::Async
builds on the facilities described to run and interact with an external program asynchronously:
my $proc = Proc::Async.new('echo', 'foo', 'bar'); $proc.stdout.tap(-> $v { print "Output: $v" }); $proc.stderr.tap(-> $v { print "Error: $v" }); say "Starting..."; my $promise = $proc.start; await $promise; say "Done."; # Output: # Starting... # Output: foo bar # Done.
The path to the command as well as any arguments to the command are supplied to the constructor. The command will not be executed until start is called, which will return a Promise
that will be kept when the program exits. The standard output and standard error of the program are available as Supply
objects from the methods stdout and stderr respectively which can be tapped as required.
If you want to write to the standard input of the program you can supply the :w
adverb to the constructor and use the methods write, print or say to write to the opened pipe once the program has been started:
my $proc = Proc::Async.new(:w, 'grep', 'foo'); $proc.stdout.tap(-> $v { print "Output: $v" }); say "Starting..."; my $promise = $proc.start; $proc.say("this line has foo"); $proc.say("this one doesn't"); $proc.close-stdin; await $promise; say "Done."; # Output: # Starting... # Output: this line has foo # Done.
Some programs (such as grep
without a file argument in this example, ) won't exit until their standard input is closed so close-stdin can be called when you are finished writing to allow the Promise
returned by start
to be kept.
Low-level APIs§
Threads§
The lowest level interface for concurrency is provided by Thread
. A thread can be thought of as a piece of code that may eventually be run on a processor, the arrangement for which is made almost entirely by the virtual machine and/or operating system. Threads should be considered, for all intents, largely un-managed and their direct use should be avoided in user code.
A thread can either be created and then actually run later:
my $thread = Thread.new(code => { for 1 .. 10 -> $v { say $v }}); # ... $thread.run;
Or can be created and run at a single invocation:
my $thread = Thread.start({ for 1 .. 10 -> $v { say $v }});
In both cases the completion of the code encapsulated by the Thread
object can be waited on with the finish
method which will block until the thread completes:
$thread.finish;
Beyond that there are no further facilities for synchronization or resource sharing which is largely why it should be emphasized that threads are unlikely to be useful directly in user code.
Schedulers§
The next level of the concurrency API is supplied by classes that implement the interface defined by the role Scheduler
. The intent of the scheduler interface is to provide a mechanism to determine which resources to use to run a particular task and when to run it. The majority of the higher level concurrency APIs are built upon a scheduler and it may not be necessary for user code to use them at all, although some methods such as those found in Proc::Async
, Promise
and Supply
allow you to explicitly supply a scheduler.
The current default global scheduler is available in the variable $*SCHEDULER
.
The primary interface of a scheduler (indeed the only method required by the Scheduler
interface) is the cue
method:
method cue(:&code, Instant :$at, :$in, :$every, :$times = 1; :&catch)
This will schedule the Callable
in &code
to be executed in the manner determined by the adverbs (as documented in Scheduler
) using the execution scheme as implemented by the scheduler. For example:
my $i = 0; my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 ); sleep 20;
Assuming that the $*SCHEDULER
hasn't been changed from the default, will print the numbers 0 to 10 approximately (i.e with operating system scheduling tolerances) every two seconds. In this case the code will be scheduled to run until the program ends normally, however the method returns a Cancellation
object which can be used to cancel the scheduled execution before normal completion:
my $i = 0; my $cancellation = $*SCHEDULER.cue({ say $i++}, every => 2 ); sleep 10; $cancellation.cancel; sleep 10;
should only output 0 to 5.
Despite the apparent advantage the Scheduler
interface provides over that of Thread
all of functionality is available through higher level interfaces and it shouldn't be necessary to use a scheduler directly, except perhaps in the cases mentioned above where a scheduler can be supplied explicitly to certain methods.
A library may wish to provide an alternative scheduler implementation if it has special requirements, for instance a UI library may want all code to be run within a single UI thread, or some custom priority mechanism may be required, however the implementations provided as standard and described below should suffice for most user code.
ThreadPoolScheduler§
The ThreadPoolScheduler
is the default scheduler, it maintains a pool of threads that are allocated on demand, creating new ones as necessary.
Rakudo allows the maximum number of threads allowed in the default scheduler to be set by the environment variable RAKUDO_MAX_THREADS
at the time the program is started.
If the maximum is exceeded then cue
may queue the code until a thread becomes available.
CurrentThreadScheduler§
The CurrentThreadScheduler
is a very simple scheduler that will always schedule code to be run straight away on the current thread. The implication is that cue
on this scheduler will block until the code finishes execution, limiting its utility to certain special cases such as testing.
Locks§
The class Lock
provides the low level mechanism that protects shared data in a concurrent environment and is thus key to supporting thread-safety in the high level API, this is sometimes known as a "Mutex" in other programming languages. Because the higher level classes (Promise
, Supply
and Channel
) use a Lock
where required it is unlikely that user code will need to use a Lock
directly.
The primary interface to Lock
is the method protect which ensures that a block of code (commonly called a "critical section") is only executed in one thread at a time:
my $lock = Lock.new; my $a = 0; await (^10).map: { start { $lock.protect({ my $r = rand; sleep $r; $a++; }); } } say $a; # OUTPUT: «10»
protect
returns whatever the code block returns.
Because protect
will block any threads that are waiting to execute the critical section the code should be as quick as possible.
Safety concerns§
Some shared data concurrency issues are less obvious than others. For a good general write-up on this subject see this blog post.
One particular issue of note is when container autovivification or extension takes place. When an Array
or a Hash
entry is initially assigned the underlying structure is altered and that operation is not async safe. For example, in this code:
my @array; my $slot := @array[20]; $slot = 'foo';
The third line is the critical section as that is when the array is extended. The simplest fix is to use a Lock
to protect the critical section. A possibly better fix would be to refactor the code so that sharing a container is not necessary.