2012-07-19
Lightweight Tasks Library for ATS
I've been using libevent a fair bit recently from my systems developed in the ATS programming language. The resulting code is callback oriented and this programming style can result in difficult to follow code. Recently I ported a python programming using the twisted asynchronous network library to ATS and the callbacks were getting hard to manage. An example of the callback style is this sample libevent program which downloads and prints an HTML web page.
To help make this code easier to manage I wrote a lightweight task scheduler for ATS. It's available in github as ats-task. The implementation uses the ucontext stack switching API. To use the library from ATS it should be cloned as task
in the $ATSHOME/contrib/
directory:
$ cd $ATSHOME/contrib
$ git clone git://github.com/doublec/ats-task task
$ cd task
$ make
The first step in using the library is to create a task scheduler and install this as the global scheduler:
var sch = scheduler_new ()
val () = set_global_scheduler (sch)
Once this is done tasks can be spawned. task_spawn
takes a stack size
and a linear closure (cloptr1) that is run when the task is scheduled:
val () = task_spawn (16384, lam () => {
val () = print ("hello\n")
val () = task_yield ()
val () = print ("world\n")
})
Tasks can yield using 'task_yield' which results in switching to the next task waiting to run, and scheduling itself to run again later.
The task scheduler needs to be activated with:
val () = run_global_scheduler ()
The scheduler exits when no more tasks are queued. The global scheduler must be unset and free'd:
val () = unset_global_scheduler (sch)
val () = scheduler_free (sch)
A compile error results if the scheduler is not unset or free'd.
To integrate with the libevent event loop I create a task that handles the event processing. It loops, processing any queued events if there are any. If no tasks are currently active it blocks when processing the event loop to prevent unneccesary polling:
fun event_loop_task {l:agz} (base: event_base l, events_queued: bool): void = let
val () = task_yield ()
in
(* If no events are queued and if no tasks are also queued we can exit *)
if event_base_got_exit (base) > 0 || (not events_queued && task_queue_count () = 0) then {
prval () = unref_base (base)
}
(* We're the only active task left, safe to block *)
else if task_queue_count () = 0 then event_loop_task (base, event_base_loop (base, EVLOOP_ONCE) = 0)
(* Other tasks are waiting, we can't block *)
else event_loop_task (base, event_base_loop (base, EVLOOP_NONBLOCK) = 0)
end
This task is spawned and runs for as long as libevent is needed. When a libevent callback is registered, instead of implementing functionality in the callback the code instead spawns a task, or schedules threads in some other manner, so that it happens during the thread scheduler loop. An example is registering a function to run on an HTTP callback:
fun rpc_callback (req: evhttp_request1, proxy: !merge_mine_proxy): void = {
val () = task_spawn_lin (16384, llam () => {
val () = merge_mine_proxy_handle_getwork (pxy, req)
})
}
val r = evhttp_set_cb {merge_mine_proxy} (http, "/", rpc_callback, proxy)
task_spawn_lin
is a variant of task_spawn
that takes a lincloptr1
function type so it can close over linear objects and views. Now when the callback is called from libevent it spawns the task and immediately exits. The task will be run when it is scheduled during the task scheduler loop.
When making HTTP requests from libevent a callback is run when the result of the HTTP request is available. To integrate that with the task library we want to suspend a task when we make the HTTP request and resume it when the callback is run. The function global_scheduler_halt
suspends the current running task and returns it. This can be rescheduled within a callback using global_scheduler_queue_task
. global_scheduler_resume
should be called to mark the point at which the task will start running when it is resumed. Sample code for this is:
fun download (url: string): http_result1 = let
val tsk = global_scheduler_halt ()
var result: http_result1
prval (pff, pf) = __borrow (view@ result) where {
extern prfun __borrow {l:addr} (r: !http_result1? @ l >> (http_result1 @ l))
: (http_result1 @ l -<lin,prf> void, http_result1? @ l)
}
val () = http_request (url, llam (r) => {
val () = global_scheduler_queue_task (tsk)
val () = result := r
prval () = pff (pf)
})
val () = global_scheduler_resume ()
in
result
end
The callback for the download is passed to the http_request
function. That callback receives a linear object containing the result of the HTTP request. In the code here we create a variable on the stack, result
of type http_result1
, to store the result. The __borrow
proof function obtains a second view to this stack variable so it can be accessed from within the linear closure passed to http_request
. In that closure it stores the result in the stack variable and consumes the proof. This is safe because task stacks are allocated on the heap so we can safely access result
from within the closure as we have removed the tsk
from the scheduler before that code is run and the stack cannot go away before then. __borrow
returns a proof function and a view to the stack variable. That proof function consumes the returned view thus ensuring the proof must be consumed and not leaving any dangling data around.
The call to global_scheduler_resume
is where computation will resume for the task when it is queued after the callback is called. The end result is allowing code like this to be written:
fn do_main (): void = {
fn print_result (r: http_result1): void =
case+ r of
| ~http_result_string s => (print_strptr (s); print_newline (); strptr_free (s))
| ~http_result_error code => printf("Code: %d", @(code))
val () = print_result (download ("http://www.ats-lang.org/"))
val () = print_result (download ("http://www.bluishcoder.co.nz/"))
}
This code processes the results of the download without exposing callbacks to the programmer. No garbage collector is used and linear types ensures that everything is correctly released and no memory leaked - otherwise the code doesn't compile.
I'm still tinkering with the API for the task library. In particular the use of a global scheduler and how the halt/queue/resume code looks. I wanted to have first class schedulers to allow creating a scheduler on each native CPU thread to allow tasks to belong to a CPU. I'm not sure if the current approach is right for that yet though. I'd like to add task communication using channels and similar constructs.
The python program I converted to ATS is now running on a bitcoin merge mining pool handling the back end merge mining requests. It has successfully handled up to 200 requests per second, each of which spawns a number of other HTTP requests to other back end services. The CPU load on the machine dropped from 80% with the python program to less than 10% with the ATS rewritten program and runs using much less memory.
For the curious, the pool uses Ur/Web for the front end web and payments interfaces and ATS for the back end pool share submission, processing and merge mining. One day I'll write a blog post detailing the progression of the pool development. It's gone from Mozart/Oz, Ur/Web and now a combination of Ur/Web and ATS. Eventually it'll be completely ATS. It's my project of choice for testing language runtimes for scalability issues. Maybe Rust or Idris will be the next languages to try it with.