Streaming from broker
We are using sendfile()
to stream data from segment logs to clients (or brokers who are acting as followers). This works great, and this is what Kafka’s doing, but maybe we can do better, considering that sendfile() can block if the data is neither on the disk cache nor on a fast SSD storage, which will in turn affect other producers and consumers, because of the current single-thread design, although even if we do wind up using multiple threads on the server, it still won’t guarantee a mostly block-free operation.
NGINX and Netflix contributed an excellent new sendfile implementation for FreeBSD, which supports AIO, which is really exactly what’d love to be able to use.Specifically, that new sys call adds 2 new flags and refines an existing flag (SF_NOCACHE, SF_READAHEAD, SF_NODISKAIO). Unfortunately, this won’t become available on Linux anytime soon.
We could consider Linux AIO (use of libaio, with -laio and libaio.h, io_submit() etc), but that’d require opening files with O_DIRECT, which comes with a whole lot of restrictions, and even then, we ‘d have to transfer from the file to user space memory, and then use write() to stream to the socket, or a fairly elaborate scheme with pipes and use of the various *splice(), tee() methods. I am not sure the complexity is going to be worth it, or that we ‘d necessarily get more performance out of it, given the need for more sys calls and need to copy or shuffle around more data.
Another alternative is use of mmap() and then use of *splice() methods to transfer mmaped file data to the socket.
Many of those sys calls accept flags, and SPLICE_F_MOVE|SPLICE_F_NONBLOCK may come in handy. We still need to resort to pipe trickery, but again, this may be worth it.
We should also consider LightHTTPD’s ‘asynchronous’ sendfile hack. Effectively what they do is:
- create a buffer in /dev/shm and mmap() it
- initiate an asynchronous read from the source file to the mapped buffer
- wait until the data is ready
- use sendfile() to send the data from /dev/shm to the network socket.
Indeed, the data is never copied to userspace; they are moved from kernel/user space. It requires use of AIO (or POSIX AIO or some other userspace threads I/O handoff scheme). The implementation can be found here.
All told, there are other options to consider, especially if we are going to support other OS and platforms. This all comes down to reducing or even eliminating the likelihood for blocking sendfile() operations, so that other consumers/producers won’t block waiting for it. It may not be really worth it for now, but we should come back to this if and when it does.
Appending bundles to segments
We are using writev()
to append data to segment log files, which is always going to be fast because it’s an append operation(although there are edge cases where it may not work like that). This should almost never block, but it might.
We can, again, rely on AIO (specifically, linux AIO) for this, in order to minimize or eliminate the likelihood for blocking writev(). The problem again is that it requires opening files /w O_DIRECT, and the underlying filesystem must properly support AIO semantics. XFS seems to be the only safe choice — in fact, only 3.16+ Linux Kernel includes an XFS impl. that properly deals with appends.
We could take into account the OS/architecture and filesystem, to optionally use AIO to do this.
File handles
If we are going to support many thousands of partitions, we need to consider the requirements. Specifically, we currently need 2 FDs for each partition(for the current segment’s log and index), and 1 index for each immutable segment. So for a partition of 5 immutable segments, we ‘d need 5 + 2 = 7 FDs. Furthermore, we need to mmap() all index files, although those are fairly small.
We could maintain a simple LRU (or maybe look into alternative replacement policies) cache of all FDs for opened segment files and limit it based on e.g getrlimit(, RLIMIT_NOFILE). So whenever we ‘d get EMFILE from accept4(), open(), socket() etc, we ‘d ask the cache to close FDs. If we need to open a file, and we get EMFILE, we ‘d need the cache to close FDs so that we can open the file — if the cache is empty it means that we have used all FDs for sockets and we should perhaps try to use setrlimit()
to adjust RLIMIT_NOFILE.
We are not going to need to solve this problem yet, but we should consider this for both performance reasons and for efficient support of thousands or even million of partitions.
Warming up disk pages
We can use MINCORE(2) to determine which segment log pages not current in-memory(block/file caches) and then 'touch' them so that they are paged-in prior to accessing them. We should also look into the use of fcntl(fd, F_NOCACHE), posix_fadvise(), readahead(), fadvise(), posix_fallocate() and fallocate() calls and use them when and where appropriate.