Coder Social home page Coder Social logo

phaistos-networks / tank Goto Github PK

View Code? Open in Web Editor NEW
937.0 42.0 70.0 8.78 MB

A very high performance distributed log service

License: Apache License 2.0

Makefile 2.05% C++ 85.27% C 11.83% CMake 0.71% Roff 0.01% Shell 0.06% CSS 0.01% Python 0.05% Meson 0.01% Dockerfile 0.01%
log high-performance service

tank's Introduction

TANK is a very high performance distributed log, inspired in part by Kafka, and other similar services and technologies. This is the second major public release, TANK 2. Read about this new public release here.

Introduction

You should begin by reading about the core concepts and the client API.

Please see building instructions. You may also want to run Tank using its Docker image.

Features include:

TANK's goal is highest performance and simplicity. If you need very high performance, operation simplicity and no reliance on other services (when running TANK in stand-alone mode), consider TANK.

Please see the wiki for more information.

We chose the name TANK because its a storage chamber, suitable for liquids and gas - which we think is analogous to a storage container for data that flows, from and to other containers and other systems via 'pipes' (connections).

Other clients

tank's People

Contributors

gabrieltz avatar markpapadakis avatar phaistonian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tank's Issues

Gotcha: Multiplexed consumed messages access, etc.

The Tank client, for performance, will use an internal to a connection buffer for the consumed messages. That is, instead of allocating memory to hold each consumed message, it simply points to that message in the connection buffer.
This provides great performance benefits. There is catch however that need to be accounted for by the application developer:

When you execute another Tank operation, while you are iterating consumed messages or otherwise accessing those, that connection buffer is reset or freed, so those messages cannot be accessed(invalid memory access). So you should either copy them somewhere yourself, or execute those new Tank calls when you no longer need to access the original messages set.

We should simplify this by creating a new utility method that accepts an std::vector<partition_content> & and a simple_allocator and allocate memory from that allocator and update the pointers to point to that instead.

Optimizations for consumer/tailing semantics

There is a low-hanging fruit kind of optimization that’s relatively easy to implement that I expect will yield some great results.
The vast majority of all partition consumers tail it, as opposed to consuming from some random sequence number in the sequence of the message numbers space of the partition. This means that the sequence number provided in the CONSUME request almost always is in the offsets range of the current/active segment.
service.cpp#read_cur() is responsible for streaming from that current partition segment, and now it uses the skiplist to determine where to start reading from in the segment in order to get to the file offset for the sequence number requested.

That is to say, it will determine the offset in the log where the message with the highest seq.um that is <= target seq.num is found, and then use linear search and successive read()s to locate it.
It will wind up invoking search_before_offset(), which is where we are incurring some sequential I/O overhead.
That function will perform a linear search by reading 128-bytes at a time until it finds a message(set) with absolute sequence number > max absolute sequence number, in order to eventually determine the file offset of that message.

  • We should be able to optimize this by reading say, at least a getpagesize() worth of data instead of 128bytes every time. That should be an obvious optimization and one that we need to implement.

However, we should also use a small cache that fronts all requests to read_cur(). It should cache all results of read_cur() current implementation, and it should first look for a match in the cache(fast-path) before doing whatever read_cur() does not.
That way, and because we can and should assume that the vast majority of CONSUME requests are, as stated, for the specific purpose of tailing the partition, and thus routed to read_cur(),
We only need a small cache(maybe a few hundred values in size/capacity), We don’t even need to bother with selecting an optimal replacement policy; we can just use as simple std::unordered_map<> and whenever it reaches a certain size, clear() it.

Network and Disk I/O blocking and file handles : optimizations

Streaming from broker

We are using sendfile() to stream data from segment logs to clients (or brokers who are acting as followers). This works great, and this is what Kafka’s doing, but maybe we can do better, considering that sendfile() can block if the data is neither on the disk cache nor on a fast SSD storage, which will in turn affect other producers and consumers, because of the current single-thread design, although even if we do wind up using multiple threads on the server, it still won’t guarantee a mostly block-free operation.

NGINX and Netflix contributed an excellent new sendfile implementation for FreeBSD, which supports AIO, which is really exactly what’d love to be able to use.Specifically, that new sys call adds 2 new flags and refines an existing flag (SF_NOCACHE, SF_READAHEAD, SF_NODISKAIO). Unfortunately, this won’t become available on Linux anytime soon.

  • If we are going to support FreeBSD we absolutely need to take advantage of it

We could consider Linux AIO (use of libaio, with -laio and libaio.h, io_submit() etc), but that’d require opening files with O_DIRECT, which comes with a whole lot of restrictions, and even then, we ‘d have to transfer from the file to user space memory, and then use write() to stream to the socket, or a fairly elaborate scheme with pipes and use of the various *splice(), tee() methods. I am not sure the complexity is going to be worth it, or that we ‘d necessarily get more performance out of it, given the need for more sys calls and need to copy or shuffle around more data.

Another alternative is use of mmap() and then use of *splice() methods to transfer mmaped file data to the socket.
Many of those sys calls accept flags, and SPLICE_F_MOVE|SPLICE_F_NONBLOCK may come in handy. We still need to resort to pipe trickery, but again, this may be worth it.

We should also consider LightHTTPD’s ‘asynchronous’ sendfile hack. Effectively what they do is:

  1. create a buffer in /dev/shm and mmap() it
  2. initiate an asynchronous read from the source file to the mapped buffer
  3. wait until the data is ready
  4. use sendfile() to send the data from /dev/shm to the network socket.

Indeed, the data is never copied to userspace; they are moved from kernel/user space. It requires use of AIO (or POSIX AIO or some other userspace threads I/O handoff scheme). The implementation can be found here.

All told, there are other options to consider, especially if we are going to support other OS and platforms. This all comes down to reducing or even eliminating the likelihood for blocking sendfile() operations, so that other consumers/producers won’t block waiting for it. It may not be really worth it for now, but we should come back to this if and when it does.

Appending bundles to segments

We are using writev() to append data to segment log files, which is always going to be fast because it’s an append operation(although there are edge cases where it may not work like that). This should almost never block, but it might.

We can, again, rely on AIO (specifically, linux AIO) for this, in order to minimize or eliminate the likelihood for blocking writev(). The problem again is that it requires opening files /w O_DIRECT, and the underlying filesystem must properly support AIO semantics. XFS seems to be the only safe choice — in fact, only 3.16+ Linux Kernel includes an XFS impl. that properly deals with appends.

We could take into account the OS/architecture and filesystem, to optionally use AIO to do this.

File handles

If we are going to support many thousands of partitions, we need to consider the requirements. Specifically, we currently need 2 FDs for each partition(for the current segment’s log and index), and 1 index for each immutable segment. So for a partition of 5 immutable segments, we ‘d need 5 + 2 = 7 FDs. Furthermore, we need to mmap() all index files, although those are fairly small.

We could maintain a simple LRU (or maybe look into alternative replacement policies) cache of all FDs for opened segment files and limit it based on e.g getrlimit(, RLIMIT_NOFILE). So whenever we ‘d get EMFILE from accept4(), open(), socket() etc, we ‘d ask the cache to close FDs. If we need to open a file, and we get EMFILE, we ‘d need the cache to close FDs so that we can open the file — if the cache is empty it means that we have used all FDs for sockets and we should perhaps try to use setrlimit() to adjust RLIMIT_NOFILE.

We are not going to need to solve this problem yet, but we should consider this for both performance reasons and for efficient support of thousands or even million of partitions.

Warming up disk pages

We can use MINCORE(2) to determine which segment log pages not current in-memory(block/file caches) and then 'touch' them so that they are paged-in prior to accessing them. We should also look into the use of fcntl(fd, F_NOCACHE), posix_fadvise(), readahead(), fadvise(), posix_fallocate() and fallocate() calls and use them when and where appropriate.

writting 100MM on tank-cli fails

Hi Mark,

Was trying to test current HEAD and I got this.

./tank-cli -b localhost:11221 -t test -t test  bm p2b  -s 100 -c 100000000 -B 8196 -R
Will publish 100,000,000 messages, in batches of 8,196 messages, each message content is 100b (compression disabled)
tank-cli: Switch/buffer.h:919: void Buffer::EnsureSize(const uint32_t): Assertion `newSize >= newMin && newSize > length_' failed.
[1]    27166 abort (core dumped)  ./tank-cli -b localhost:11221 -t test -t test bm p2b -s 100 -c 100000000 -B 

do you have a different tag i should try.

Consume responses - compression

We currently don't compress the consume responses (i.e message sets streamed from the server to the client), because we rely on sendfile() -- but maybe, depending on the size of bytes we need to stream, it would be worth it to (read, compress, stream) instead, thereby incurring the kernel to user space copy, compression etc, but still make more sense.

However, we should probably reserve this kind of behaviour for when really larges amounts of data are to be streamed (e.g over 10MBs), and in this case, we should rely on some background threads pool to (read, compress) that data, before handing it off back to the main thread for streaming it, because we really don't want to block the main thread (read can block for longer than we anticipated, and compression may take far longer than we thought). Furthermore, we may need to employs some kind of more elaborate heuristics there for deciding when to do that or not, and not just rely on the span/range of data to stream.

This could be really beneficial when, for example, replaying a partition's worth of events.

Consider batching incoming partition messages before flushing them together to partition segment

Currently, the broker will append to the leader (itself in standalone mode), but maybe this is not optimal. We should support for tuning select topics so that the server would buffer upto n-number of messages and it would flush them to the segment until it got >= that number, or some time has elapsed since it began buffering, or maybe also specify a size threshold.

This is particularly useful when you have many producers, all producing 1-2 messages at/time.

Kafka's likely doing something similar, but I am not sure what's the optimal way to do this in terms of semantics. Will need to consider Kafka's design, but here are a few questions that need be answered:

  • How to deal with incoming bundles with a compressed message set? Flush existing and restart buffering, or decompress them and buffer the messages in the compressed message set?
  • When should the broker respond (generate an ack) to the clients? immediately, or as soon as the messages the client produced, and are now buffered along with other messages, have all been flushed?

Some of the benefits:

  • Opportunities for compression, because when buffered messages will be flushed the broker will consider compression -- this may not be what we want though because we wouldn't want the compression to block the thread so maybe we can hand this off to another thread; a few us worth of interop delay shouldn't mean much
  • Far fewer system write()/writev() calls

Compaction/cleanup improvements

Cleanup works fine, but we need to improve it with throttling support, and by exposing some hard coded options as configuration options.

  • Throttling. We are now reading and writing as fast as we can, with no artificial delay between operations.
    This is not optimal, because it can affect serving consume and produce requests. I also don't think we should rely on some static rate, and instead we should consider the the state of the broker, and make sure we adjust the rate based on it, dynamically.
    If the broker is mostly idle, we should increase the rate, if the broker is busy servicing consume and produce requests, we should lower the rate.
  • Configuration options for maximum bundle size, minimum log file size, etc
    Those are now hardcoded in compact_partition(), and I think they the default values are pretty reasonable, but we should consider exposing them as configuration option

Problem with Client when receiving more than one consume message

When the client receives multiple consume messages during a single poll() period, there seems to be a problem where some of the messages' contents returned by consumed() are corrupted/bad pointers. This can cause errors due to the corrupted char* data() pointer or size_t size() for consumed_msg, such as a SEGFAULT.

I've created this branch to reproduce the issue: https://github.com/krconv/TANK/tree/client-tests

git clone https://github.com/krconv/TANK/tree/client-tests
cd TANK
git checkout client-tests
docker build . -t tank-test
while [ $? -eq 0 ] ; do docker run -it tank-test ; done

My output:

Note: Google Test filter = ClientTest.CanConsumeSimultaneously
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from ClientTest
[ RUN      ] ClientTest.CanConsumeSimultaneously
static void client::test::ClientTest::createEmptyTankTopics()
static void client::test::ClientTest::startTank()
<=TANK=> v0.77 | 2 topics registered, 2 partitions; will listen for new connections at 127.0.0.1:11011
(C) Phaistos Networks, S.A. - http://phaistosnetworks.gr/. Licensed under the Apache License

tests/client-test.cpp:184: Failure
      Expected: 1
To be equal to: pMessage->seqNum
      Which is: 140039336233528
[segfault]

Use a more appropriate timers tracking and scheduling scheme

We now use a simple LL because there are usually no more than 100 or timers we need to check every few ms, and that's not a real concern, but this is far from optimal.

We should instead use either a hierarchical timerwheel or a simple heap/priority queue. Turns out go's using heaps, and O(1) for accessing the next timer to fire is a very desire properly, and given that all other operations cost O(log(n)) (no more than 14 ops for 10k timers), it makes a lot of sense to use binary heaps.

Minimum fetch size semantics for consume request

Currently, the minFetchSize property of requested (topic, partition) for consume requests represents bytes; that is, this is a hint to the Tank broker, so that if the request needs to tail the partition, then Tank will not immediately respond as soon as it has any data, but as it soon as it has equal or more bytes worth of data(bundles data) in the topic partition.

However, unless Kafka, we can and we do track the number of messages appended to the segment for requests tailing the partition -- we can do that because that information is encoded in the publish request, where e.g Kafka would have to parse message sets which would potentially mean decompressing them.

So maybe we should change the semantics of that property to mean number of messages, not number of bytes -- so that by e.g setting that to 5, it would ask the Tank broker to return with a consume response as soon as it has 5 or more new messages appended in the broker. Which seems to me far more useful.

Switch is missing prioqueue.h

Hi Mark,

I am trying to compile Trinity using Switch from this project and it is missing prioqueue.h

./docset_iterators.h:6:10: fatal error: 'prioqueue.h' file not found

Can you please add the file?
Best,
Raja

Raw disk access?

From the core concepts:

All bundles(which include messages) are stored physically in segments on each broker's underlying filesystem.

Is there performance to be gained by writing directly to disk, without the overhead of a filesystem? Most, if not all of the filesystem features wouldn't be needed anyway.

Windows platform

Can I run it on Windows platform or just only support Linux? There was no specific platform details in README.

Build with CMake

Hi,
We're looking to integrate TANK into a (mostly C++) project that we build using the CMake build system, so it would help if TANK could also be built with the CMake build system

Retrieve request max size

Based on KIP-74

This KIP proposes to introduce new version of fetch request with new top-level parameter max_bytes to limit the size of fetch response and solve above problem.

This seems trivial to implement. We currently don't have a need for it, but if anyone wants this kind of feature, it should be easy to introduce it.

Consume semantics - multiple partitions request issue

If, in a single consume request, you request content from two partitions, and one of them does not exist, and the other has no data (will need to tail), like so:

        client.consume(
                {
                        { {_S8("foo_filtered"), 1}, { UINT64_MAX, 1e5 } },
                        { {_S8("foo"), 0}, { UINT64_MAX, 1e5 } },
                }, 1e3, 0);

Tank will respond immediately with not-found for foo_filtrered:1, but it will not register a watch for foo:0 (which is defined). This is correct, however, we don't get information about foo:0 in the response.

Consumer Offsets, support via internal topic/partition

According to @miguno ( I need to go through the Kafka codebase to figure out what they do exactly ), they are maintaining an internal/special-purpose topic __consumer_offsets, where the policy is configured as cleanup, because they only really care for the last value, and, presumably, on startup, the partition is replayed in order to restore the state.

That's a clever and pragmatic way to implement it.

See also: http://www.slideshare.net/jjkoshy/offset-management-in-kafka

Corrupt Files Produce Core Dumps

Hello,

I've been doing some tests that involve pulling the plug on our product, and so also on TANK. With this approach, it has been very easy to get TANK into a non-functional state.

There are essentially three variations of failures I've seen.

Recoverable Segment/Index Corruption

This is of course ideal. It is a scenario that can easily be addressed by following the steps described in Troubleshooting. There are no core dumps. Not a big deal.

Zero Byte Logs

There have been a number of cases where the active segment ends up as a zero byte file (logs causing the failure attached). The failure I've seen looks as follows:

$ tank -p zero-byte-log/ -l 127.0.0.1:11011
tank: /builddir/build/BUILD/tank/service.cpp:3051: Switch::shared_refptr<topic_partition> Service::init_local_partition(uint16_t, const char*, const partition_config&): Assertion `l->lastAssignedSeqNum >= l->cur.baseSeqNum' failed.
Aborted (core dumped)

Non-Empty Corrupt Logs

The other failure case is one I've only seen once and I can't say the exact situation that led to it. I don't believe the machine that produced these logs was unplugged as in the other cases.

This one is a corrupt segment that has data but is not recoverable through TANK_FORCE_SALVAGE_CURSEGMENT. It looks like (logs attached once again):

$ tank -p corrupt-log/ -l 127.0.0.1:11011
Failed to initialize topics and partitions:pread64() failed:Success

Steps to Address

The only way I've found to address the last two failures is deleting the active segment and index files for the offending topic(s).

If you could give me some less intrusive steps to recovery, that would be great. It would also be preferable if TANK could determine that the segment or index is not as expected and fail gracefully rather than core dumping.

Thanks
corrupt-log.tar.gz
zero-byte-log.tar.gz

tank-cli crash if message has key but no length

Steps:

  1. ./tank-cli -b localhost:11011 -p 0 -t foo set -K hello foo
#0  Buffer::Reserved (this=<optimized out>) at Switch/buffer.h:173
#1  TankClient::get_buffer (this=0x7ffcac33b8a0) at tank_client.h:474
#2  TankClient::try_recv (this=this@entry=0x7ffcac33b8a0, c=c@entry=0x799830) at client.cpp:2010
#3  0x0000000000418f5b in TankClient::poll (this=0x7ffcac33b8a0, timeoutMS=<optimized out>, timeoutMS@entry=800) at client.cpp:2453
#4  0x00000000004039af in <lambda()>::operator()(void) const (__closure=0x7ffcac33b730) at cli.cpp:949
#5  0x0000000000406cbd in main (argc=2, argv=<optimized out>) at cli.cpp:1201
  1. (after setting blanks in step 1) ./tank-cli -b localhost:11011 -p 0 -t foo get 0
#0  TankClient::get_payload (this=this@entry=0x7ffc61b47630) at tank_client.h:501
#1  0x000000000041ebbb in TankClient::consume_from_leader (this=this@entry=0x7ffc61b47630, clientReqId=clientReqId@entry=2, leader=..., 
    leader@entry=..., from=from@entry=0x12c3a70, total=total@entry=1, maxWait=maxWait@entry=8000, minSize=0) at client.cpp:914
#2  0x0000000000422007 in TankClient::consume (this=this@entry=0x7ffc61b47630, req=std::vector of length 1, capacity 1 = {...}, 
    maxWait=maxWait@entry=8000, minSize=minSize@entry=0) at client.cpp:2698
#3  0x0000000000407227 in main (argc=<optimized out>, argv=<optimized out>) at cli.cpp:348

Implement inflight requests tracking for timeouts

track_inflight_req(), consider_inflight_reqs() and forget_inflight_req() implementation is pending.

We need to implement them so that we can deal with requests where the response takes too long, e.g if you just stop the broker with a STOP signal. We can use a binary heap, or a skip-list, or even a vector, although ideally we 'd like both reg/dereg to be an O(1) operation, so maybe we can extend the use of outgoing_payload, by adding a switch_dlist to it (for its request) and then walk that list in consider_inflight_reqs(), and inserting/removing in track_inflight_req() and forget_inflight_req(). We should also back this by a Switch::unordered_map<> so that we can quickly identify the payload in forget_inflight_req().

Merge-join multiple streams

It should be easy to implement a TankClient utility method that facilitates joining multiple streams based on the message timestamp and a simple merge strategy.
For 1+ streams, we can just consume and buffer from streams and 'pop' the earliest message from all of those buffered message streams, and refill the stream when needed.
This should make it easy to join many different streams as if it was a single stream, all the while retaining time-based ordering (this doesn't guarantee strict ordering but it will almost always be the case anyway).
This can work for multiple partitions of the same topic, or multiple partitions across multiple topics.

Implement timers scheduler using EBTs

Bring in EBT codebase and implement a proper timers scheduler using EBTs as opposed to the simplistic doubly-linked list implementation we have now. We use timers for tracking requests pending new messages and, we could, conceivably, have thousands of such actions at any given time. It will also provide us with O(1) next-timer-timestamp access which is useful.

Client retained outgoing packets semantics

We should probably make sure that the aggregate size in bytes of all retained packets (for requests pending a response) does not exceed some value so that we won't run out of memory; if it reaches that threshold, new requests should be dropped immediately(and return 0 to the callee).

Parallelize startup sequence topics initialization

  • Even though all we really need during startup is walk topics and partitions directories and open log files and indices - which is pretty fast, for setups of 100s of topics and 1000s of partitions, it should be beneficial to parallelize initialization of topics/partitions via std::async() jobs. It should be pretty trivial.

Compactions: retain _first_ not last. Clever idea by @kostas

@kostas on @dist-sys(Slack) came up with a great idea. Instead of retaining the last message in a set of messages with the same key, retain the first instead (during compactions). That way, you can implement an indempotency scheme, where you really care for the first message, and if you get more you will ignore them anyway, and you want them to be purged, eventually.

consume response / streaming semantics

For consume responses, where the returned response size is too high, instead of ingesting the whole response into the buffer, we can instead parse bundles as they arrive using a simple state machine.
That way, we can e.g reuse the input buffer if we reach its end, or at least trim it so that we won't need much memory to process the response. It will mean that we 'll get to provide the client(callee) with bundles/messages faster.

This shouldn't be hard to implement.

Reconsider use of usedBufs[] for decompressed batch msgs

When we are parsing consume responses, and we are dealing with compressed message sets of batches, where we need to decompress them, we now just get_buffer(), decompress there, reference that buffer, and append to a usedBufs[], which we iterate in the next poll() call in order to put_buffer() all buffers in there.

This works, and it helps avoiding both realloc() costs (if we used, say, 1 IOBuffer for decompressing all batch msg sets), and it also doesn't need patching, but there are two problems with that idea. Assuming we get 100k compressed batches, and each batch contains a message set with a single message:

  1. We 'd need 100k IOBuffers. The cost of allocation may be very high
  2. In the next poll() call, we 'd need to iterate the 100k-sized usedBufs[] and put_buffer() 100k times

Those shouldn't necessarily be expensive operations, but it's far from optimal nonetheless.

Instead, we should come up with a hybrid scheme, where we use far fewer IOBuffers, and we patch messages if needed(if they point to an IOBuffer used for decompressing content), so that we will both need to allocate/use far fewer IOBufs, and won't need to iterate/release so many of them in the next poll() invocation.

This is a low-priority issue.

Identify missing segments and support for sparse seq.num space

We are now encoding the [first message sequence number, last message sequence number] in the immutable segment log filename.
This makes easy to determine wether a consume request needs to access a segment that has been removed from the filesystem - for whatever reason.
That is, if we have the following immutable segments
[1-10,50-100,101-200, 201-300]
and a consume request's base sequence number is between [11, 50), instead of assuming the contents will be in the segment of [1,10](see use of upper_bound_or_match%28%29), we can check that segment's [first, last] and figure out that isn't in fact there, and do whatever is appropriate to deal with it.

For compactions, we 'll need to support access to messages that are not necessarily ordered where next message.seqNum = prevMessage.seqNum + 1. I am still not sure what's the best way to go about this (will likely use one of the spare bundle flags to denote that a bundle base sequence number is encoded in the bundle header, and that each message also encodes an offset:varint from that bundle base). This can wait though, because there is no need for compactions support yet.

Consume based on timestamp

Our ops folk suggested it 'd be nice to be able to consume starting from a specific timestamp(each message header contains the creation timestamp in milliseconds), in addition to consuming by absolute sequence number.

It should be easy to implement, and would perhaps help with all kinds of problems. We just need to maintain another index, and this should be enabled on a per topic or n a per topic/partition basis.

Kafka doesn't support this feature.

Simultaneous Consumes Confuse Response Messages

Hello,

My company has been using TANK to great success in a recent project. Thank you!

I recently uncovered a bug where consuming from different topics at the same time results in the same message being returned for both request IDs. I've created a (fairly) minimal test to demonstrate the issue.

I'm using the google testing framework, but the test can be run otherwise with very minimal changes. It does assume that TANK is running and that "topic1" and "topic2" exist with a single partition for each. I can update this snippet to create the topics through the client if that would be helpful.

#include <gtest/gtest.h>

#define LEAN_SWITCH
#include <tank_client.h>
#undef LEAN_SWITCH

#include <chrono>
#include <iostream>
#include <string>


TEST(TankClient, readsFromMultipleTopicsSimultaneously)
{
    auto stringFrom8 = [](strwlen8_t string) {
        return std::string(string.data(), string.size());
    };

    auto stringFrom32 = [](strwlen32_t string) {
        return std::string(string.data(), string.size());
    };


    ::TankClient client;
    client.set_default_leader("127.0.0.1:11011");

    auto msSinceEpoch = std::chrono::duration_cast<std::chrono::milliseconds>(
        std::chrono::system_clock::now().time_since_epoch());

    uint64_t timestamp = static_cast<uint64_t>(msSinceEpoch.count());

    const std::string topic1{"topic1"};
    const std::string topic2{"topic2"};

    const std::string topic1Message{"I am from topic 1"};
    const std::string topic2Message{"I am from topic 2"};

    const auto produceTopic1Id = client.produce({       
        {
            {topic1.c_str(), 0},
            {{topic1Message.c_str(), timestamp, ""}}       
        }       
    });


    const auto produceTopic2Id = client.produce({       
        {
            {topic2.c_str(), 0},
            {{topic2Message.c_str(), timestamp, ""}}       
        }       
    });


    const auto consumeTopic1Id = client.consume({
            {
                {topic1.c_str(), 0},
                {0, 4 * 1024}
            }
        },
        2000,
        0
    );


    const auto consumeTopic2Id = client.consume({
            {
                {topic2.c_str(), 0},
                {0, 4 * 1024}
            }
        },
        2000,
        0
    );

    std::cout << topic1 << " produce: " << produceTopic1Id << std::endl;
    std::cout << topic2 << " produce: " << produceTopic2Id << std::endl;
    std::cout << topic1 << " consume: " << consumeTopic1Id << std::endl;
    std::cout << topic2 << " consume: " << consumeTopic2Id << std::endl;

    size_t expectedResponses = 4;
    size_t receivedResponses = 0;

    while (receivedResponses < expectedResponses)
    {
        client.poll(1000);

        for (const auto& produceAck : client.produce_acks())
        {
            std::cout << "Produced " << produceAck.clientReqId << " to "
                      << stringFrom8(produceAck.topic) << ":" << produceAck.partition
                      << std::endl;

            ++receivedResponses;
        }

        for (const auto& partitionContent : client.consumed())
        {
            std::cout << "\nRequest  : " << partitionContent.clientReqId << std::endl;
            std::cout << "Topic    : " << stringFrom8(partitionContent.topic) << std::endl;
            std::cout << "Partition: " << partitionContent.partition << std::endl;

            for (auto pMessage : partitionContent.msgs)
            {
                auto receivedMessage = stringFrom32(pMessage->content);

                if (partitionContent.clientReqId == consumeTopic1Id)
                {
                    EXPECT_STREQ(topic1Message.c_str(), receivedMessage.c_str());
                }
                else if (partitionContent.clientReqId == consumeTopic2Id)
                {
                    EXPECT_STREQ(topic2Message.c_str(), receivedMessage.c_str());
                }
                else
                {
                    std::cout << "    *** This consume response is unexpected ***" << std::endl;
                }

                std::cout << "  Timestamp: " << pMessage->ts << std::endl;
                std::cout << "  Content  : " << stringFrom32(pMessage->content)  << std::endl;
            }

            ++receivedResponses;
        }
    }
}

The test fails at the snippet:

                if (partitionContent.clientReqId == consumeTopic1Id)
                {
                    EXPECT_STREQ(topic1Message.c_str(), receivedMessage.c_str());
                }

with the response matching consumeTopic1Id having a message I am from topic 2.

Any insight you can give me into this behavior would be much appreciated.

wrong msgs kept and returned after compaction

config

log.retention.secs = 120
log.roll.secs = 120
log.cleanup.policy = cleanup

Operation

  1. I published a few messages with the same key. Then some more with another key.
  2. 2 minutes later (see retention time) I set some more messages with both keys.
  3. I issued a consume request from 0

What I got back was not what I expected.

  • From first file (14 I think), I got back messages with sequence id 18 and 19.
  • When requesting 20, I again got back messages 18 and 19, which caused my client to loop infinitely.
  • The next file was 24.
  • When requesting 24, I got message 24 as expected.

So there are 2 issues here.

  1. After compaction, the last message in file 14 should have been 23, rather than 19.
  2. How do I get message 24 without knowing where the new file starts ?

tank-cli also crashed when attempting to consume 0

#0  Buffer::Reserved (this=<optimized out>) at Switch/buffer.h:173
#1  TankClient::get_buffer (this=0x7fffee91da80) at tank_client.h:474
#2  TankClient::try_recv (this=this@entry=0x7fffee91da80, c=c@entry=0xdd5fa0) at client.cpp:2010
#3  0x000000000042338b in TankClient::poll (this=0x7fffee91da80, timeoutMS=<optimized out>) at client.cpp:2453
#4  0x0000000000404aaf in main (argc=1, argv=0x7fffee91f3a8) at cli.cpp:359

Flush index/skiplist periodically

When the inmemory skiplist ( topic_partition_log::cur::index::skipList ) holds over 65k records, we force a segment roll. This is not ideal. Instead, when it exceeds 65k or more entries, we should just empty it, munmap the ondisk index data and mmap them again. This is trivial, and will get rid of the artificial 65k limit we have now.

Wide index records

There is partial support for wide index records already, and in practice, it won't be needed unless you have really HUGE datasets, but we should implement it anyway. This should be trivial, and will come down to generating a new index from the old, where the new entries are {seqNum:u64, fileOffset:u32} instead of {relative SeqNum:u32, fileOffset:u32}.

GC inactive connections

We will track all inactive connections in a doubly-linked list so that if open(), socket(), accept4(), etc fail with ENFILE or EMFILE, we will close 1 or more inactive connections and try again. We will also make sure that connections idle for too long are terminated to reduce resources pressure.

Add namespacing

Hello,

I've been working to integrate TANK into a project. As I've expanded the use of the client further into our codebase, I've run into a number of naming conflicts as many of TANK's declarations (I am also referring to the contained Switch code) are in the global scope. I realize the conflicts should also be resolved in our own namespacing. However, client integration would be significantly simplified if the TANK codebase were contained within namespaces.

Thanks

Consider use of multiple threads

In practice, this will only benefit latency if we are going to be dealing with a lot of compressed batch message sets, which could keep a thread busy for a long time, blocking it from processing incoming input and scheduling outgoing requests.

It should be fairly trivial to implement this, though we need to be careful about retain/release semantics, and about costs of shared access. We should probably delay this until we absolute need it, so that we 'll be better informed, and have a more evolved codebase.

Extend log retention and flushing options

From Kafka:

  • log.roll.hours (24 * 7)

    The maximum time before a new log segment is rolled out

  • log.default.flush.interval.ms (3000)

    Controls the interval at which logs are checked to see if they need to be flushed to disk. A background thread will run at a frequency specified by this parameter and will check each log to see if it has exceeded its flush.interval time, and if so it will flush it.

  • log.retention.hours (168)

    Controls how long a log file is retained.

  • log.cleanup.interval.mins (10)

    Controls how often the log cleaner checks logs eligible for deletion. A log file is eligible for deletion if it hasn't been modified for log.retention.hours hours.

  • log.flush.interval (500)

    Controls the number of messages accumulated in each topic (partition) before the data is flushed to disk and made available to consumers.

See the various consumer/producer client options that may be useful to support.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.