When you can’t take, maybe, Iterate
This is the story of realising how, what appeared to be a perfectly reasonable choice for an interface at first, didn’t quite turn out to be so. Surprises, sometimes can hide in plain sight.
Context/Background
Consider a big data processing application that processes a large number of tuples produced by multiple input sources (Think MapReduce). A reasonable design for such an application could include the following abstractions:
1. TaskInput
The TaskInput
abstraction is for the input sources that produce tuples. TaskInputs can come in various flavours, but a reasonable expectation from all of these could be to implement the following method:
def take(offset: Int, count: Int): List[Tuple]
To consume all the tuples generated by a TaskInput, the caller can invoke “take” multiple times in a while loop with the offset and count parameters updated in each iteration.
2. Task
The Task
abstraction encapsulates the processing or computation performed on the tuples produced by the TaskInput. It is the “caller” referred to above. This processing could for example, imply the ubiquitous count operation from MapReduce, parsing tuples to extract fields or applying certain checks on tuple fields for filtering. A task can also serve as TaskInput for another Task with the output tuples of first task acting as input for the second task.
The problem with take
Now, what’s the problem with take
? On a closer inspection, there are at least two issues:
1. take assumes random access is possible
While adopting the take
interface for our TaskInputs, we have imposed a rather stringent, implicit demand on our TaskInputs: the ability to randomly access tuples at any given index.
For simplistic TaskInputs such as a simple file with a sequence of records of fixed length, this is straightforward. For more evolved TaskInputs such as an inverted index that generates tuples based on certain “search terms”, a performant implementation of take
is still possible. The inverted index can be used to first, obtain the array of tuple ids matching the “search terms”. When take
is subsequently invoked with a certain offset and count, one can get hold of the tuple ids from this array and return the corresponding tuples.
However, what if the TaskInput does not have an inverted index and still has to generate matching tuples based on certain search terms? The matching tuples are now generated via a grep style operation of search terms on each tuple. In such a scenario, while the first call to take
ie. take(0, n)
is okay, a subsequent call to take
with a non-zero offset is quite wasteful as it causes the first n
tuples to be generated again via the costly search term matching operation, only to be ignored altogether. The wasteful processing keeps on compounding for subsequent calls to take
.
At this stage, one might suggest to just do the matching operation for all records in the TaskInput
once and store all the matching tuples upfront in a random-access data structure such as an Array. This array could be used to then implement the random-access take
operation. However, it’s quite possible that the tuples from the TaskInput
do not fit in-memory forcing us to spill tuples over to disk. Spilling to disk, however, has its own cost in terms of performance.
There appears to be no reasonable way of implementing the
take
interface for aTaskInput
if it does not support random access to tuples produced by it.
2. take assumes that the caller knows better than TaskInput how many tuples to return
take
takes a parameter n
specified by the caller, presumably to avoid memory pressure on the caller due to too many tuples returned at once. However, the count
of tuples is only a proxy of the real metric that we want to track ie. the memory consumed by these tuples. Even a small count
of tuples may result in memory pressure on the caller if each of those tuples take up a lot of memory. The size of the individual tuples (which may even vary from tuple to tuple) is hard for the caller to know beforehand. In other words, although the intent of the caller is right, it’s not enough.
Iterator to the rescue
So, what could we do to address the above problems with take
? It turns out that we need not impose unreasonable demands of random-access on our TaskInputs
for our use-case. What we really need is just the ability to iterate over all the tuples produced by TaskInput
. Which leads us to the following interface for TaskInput
:
def iterator: Iterator[List[Tuple]]
Instead of the caller specifying the offset
and count
, the caller can just keep on invoking the next
method of the iterator obtained above, and it will be served with a reasonably sized batch of tuples until there are no more. It’s easy to see that this interface does not impose random access requirement on the TaskInput
. It is more generic in the sense that a performant implementation of the Iterator
is possible for simple files, inverted indexes as well as TaskInputs
without inverted indexes. In the latter case specifically, one need not waste cycles on computing the matching tuples that have been returned to the caller previously; one can compute matching tuples on the fly for each invocation of Iterator.next
and keep on marching forward.
Note that each call to Iterator.next
returns a batch of tuples, the size of which is decided upon by the TaskInput
itself. TaskInput
can keep track of the memory consumed as the matching tuples are generated and terminate the batch at a particular threshold. With this interface, the TaskInput
can handle the case of large sized tuples which isn’t easy with the take(offset, n)
interface.
So you think you can have it all?
While we have gained a lot by having an Iterator
interface, we have also lost some. In particular, we have lost the ability of idempotent retries for retrieving tuples from a TaskInput
. With take(offset, count)
interface, the calling Task
can retry to its heart’s content if a take
invocation fails. The same set of tuples is guaranteed to be returned. There is no side-effect of the take(offset, count)
on the TaskInput
.
Not so with Iterator.next
though. If in response to an Iterator.next
invocation, the TaskInput
successfully generates tuples but the call to Iterator.next
fails on the client side due to some network error in transporting the results back to the caller, a retry attempt with a new invocation of Iterator.next
would end up resulting in a different set of tuples. This is because every next
call has a sort of side-effect on TaskInput
; that of advancing the pointer to next set of results to be returned.
Therefore, one needs to be careful with retries on Iterator
. The caller may possibly need to discard all the results obtained so far, and start afresh if any Iterator.next
call fails before all results are returned.
Lesson Learnt
The right interfaces reveal themselves, if not immediately, then in due course.
References
- Dryad: A big data processing system with beautiful interfaces, among other things.
- MapReduce: The original “germ” that spawned it all (well, original only to an extent, if you consider that the map reduce ideas have been directly borrowed from the world of functional programming; the application of these ideas to construct a big data processing infrastructure was indeed original though)