When you can’t take, maybe, Iterate

Gaurav Jain
5 min readMar 22, 2021
A tree of computations. Each node is a “Task” doing some processing on a set of input tuples from its parent, a TaskInput.

This is the story of realising how, what appeared to be a perfectly reasonable choice for an interface at first, didn’t quite turn out to be so. Surprises, sometimes can hide in plain sight.

Context/Background

Consider a big data processing application that processes a large number of tuples produced by multiple input sources (Think MapReduce). A reasonable design for such an application could include the following abstractions:

1. TaskInput

The TaskInput abstraction is for the input sources that produce tuples. TaskInputs can come in various flavours, but a reasonable expectation from all of these could be to implement the following method:

def take(offset: Int, count: Int): List[Tuple]

To consume all the tuples generated by a TaskInput, the caller can invoke “take” multiple times in a while loop with the offset and count parameters updated in each iteration.

2. Task

The Task abstraction encapsulates the processing or computation performed on the tuples produced by the TaskInput. It is the “caller” referred to above. This processing could for example, imply the ubiquitous count operation from MapReduce, parsing tuples to extract fields or applying certain checks on tuple fields for filtering. A task can also serve as TaskInput for another Task with the output tuples of first task acting as input for the second task.

The problem with take

Now, what’s the problem with take? On a closer inspection, there are at least two issues:

1. take assumes random access is possible

While adopting the take interface for our TaskInputs, we have imposed a rather stringent, implicit demand on our TaskInputs: the ability to randomly access tuples at any given index.

For simplistic TaskInputs such as a simple file with a sequence of records of fixed length, this is straightforward. For more evolved TaskInputs such as an inverted index that generates tuples based on certain “search terms”, a performant implementation of take is still possible. The inverted index can be used to first, obtain the array of tuple ids matching the “search terms”. When take is subsequently invoked with a certain offset and count, one can get hold of the tuple ids from this array and return the corresponding tuples.

However, what if the TaskInput does not have an inverted index and still has to generate matching tuples based on certain search terms? The matching tuples are now generated via a grep style operation of search terms on each tuple. In such a scenario, while the first call to take ie. take(0, n) is okay, a subsequent call to take with a non-zero offset is quite wasteful as it causes the first n tuples to be generated again via the costly search term matching operation, only to be ignored altogether. The wasteful processing keeps on compounding for subsequent calls to take .

At this stage, one might suggest to just do the matching operation for all records in the TaskInput once and store all the matching tuples upfront in a random-access data structure such as an Array. This array could be used to then implement the random-access take operation. However, it’s quite possible that the tuples from the TaskInput do not fit in-memory forcing us to spill tuples over to disk. Spilling to disk, however, has its own cost in terms of performance.

There appears to be no reasonable way of implementing the take interface for a TaskInput if it does not support random access to tuples produced by it.

2. take assumes that the caller knows better than TaskInput how many tuples to return

take takes a parameter n specified by the caller, presumably to avoid memory pressure on the caller due to too many tuples returned at once. However, the count of tuples is only a proxy of the real metric that we want to track ie. the memory consumed by these tuples. Even a small count of tuples may result in memory pressure on the caller if each of those tuples take up a lot of memory. The size of the individual tuples (which may even vary from tuple to tuple) is hard for the caller to know beforehand. In other words, although the intent of the caller is right, it’s not enough.

Iterator to the rescue

So, what could we do to address the above problems with take ? It turns out that we need not impose unreasonable demands of random-access on our TaskInputs for our use-case. What we really need is just the ability to iterate over all the tuples produced by TaskInput. Which leads us to the following interface for TaskInput :

def iterator: Iterator[List[Tuple]]

Instead of the caller specifying the offset and count, the caller can just keep on invoking the next method of the iterator obtained above, and it will be served with a reasonably sized batch of tuples until there are no more. It’s easy to see that this interface does not impose random access requirement on the TaskInput . It is more generic in the sense that a performant implementation of the Iterator is possible for simple files, inverted indexes as well as TaskInputs without inverted indexes. In the latter case specifically, one need not waste cycles on computing the matching tuples that have been returned to the caller previously; one can compute matching tuples on the fly for each invocation of Iterator.next and keep on marching forward.

Note that each call to Iterator.next returns a batch of tuples, the size of which is decided upon by the TaskInput itself. TaskInput can keep track of the memory consumed as the matching tuples are generated and terminate the batch at a particular threshold. With this interface, the TaskInput can handle the case of large sized tuples which isn’t easy with the take(offset, n) interface.

So you think you can have it all?

While we have gained a lot by having an Iterator interface, we have also lost some. In particular, we have lost the ability of idempotent retries for retrieving tuples from a TaskInput . With take(offset, count) interface, the calling Task can retry to its heart’s content if a take invocation fails. The same set of tuples is guaranteed to be returned. There is no side-effect of the take(offset, count)on the TaskInput.

Not so with Iterator.next though. If in response to an Iterator.next invocation, the TaskInput successfully generates tuples but the call to Iterator.next fails on the client side due to some network error in transporting the results back to the caller, a retry attempt with a new invocation of Iterator.next would end up resulting in a different set of tuples. This is because every next call has a sort of side-effect on TaskInput ; that of advancing the pointer to next set of results to be returned.

Therefore, one needs to be careful with retries on Iterator . The caller may possibly need to discard all the results obtained so far, and start afresh if any Iterator.next call fails before all results are returned.

Lesson Learnt

The right interfaces reveal themselves, if not immediately, then in due course.

References

  1. Dryad: A big data processing system with beautiful interfaces, among other things.
  2. MapReduce: The original “germ” that spawned it all (well, original only to an extent, if you consider that the map reduce ideas have been directly borrowed from the world of functional programming; the application of these ideas to construct a big data processing infrastructure was indeed original though)

--

--