Advice on a large distributed priority queue like schemas

Hello,

We are building a new messaging queue, that relies on CockroachDB and I would like to get some feedback on the tables architecture we have from more experienced users (we’re just learning crdb).

Usecase:

We need to build a large distributed priority queue (or you can see it as a sorted set with consumption only from the HEAD):

  • let’s presume we have a medium-high throughput (100k-500k messages/s, composed of selects, inserts, updates and deletes - worst nightmare)
  • each message has 3 sequential operations: is Inserted, Selected, then UPDATE or Deleted after few seconds (the time between select and delete may be larger from minutes to weeks but in our benchmarks we set it as low as possible)
  • a message has a Timestamp/Priority which is mutable. You can picture our tables as a Sorted Set where the Timestamp is the score.
  • messages are inserted with ANY timestamp/priority
  • the timestamp can be updated (so the message will move in the sorted table)
  • there will be possible billions of records
  • the metadata of each message can be large >100kb

Consumption:

  • messages are always consumed (SELECT + UPDATE or SELECT + DELETE) in ASC ORDER of the timeline (out of order consumption, this will create a hotspot on the ranges with oldest timestamps but is what we want, to group all oldest messages so we can hit only 1 node/range to get them all)
  • we split the messages in random N buckets (to allow us at app level to paralelize consumption)
  • SELECTs do NOT have contention (1 consumer per bucket, so we paralelize at app level)
  • ALL the SELECTs are made using RANGE on timestamp
    Not knowing the crdb internals (yet) I don’t know which of these alternatives will be more optimal.

Solution A:

Simpler method, relies on Updates/mutability and less operations :

  • ComposedID (primary key, string/bytes, composed of “bucket_id_message_id”)
  • bucket_id
  • timestamp
  • message_id
  • other metadata (which can be large > 100kb)
  • INDEX: timestamp,ID
  • INDEX: message_id

most run query (get the oldest messages from a bucket):
SELECT * FROM solutiona WHERE ID >= 000001 AND ID < 0000002 AND timestamp >= X AND timestamp <= Y LIMIT 100;
The idea is to use lexicographical sorted buckets with padding so that the select hits only 1 node/range, and put the as a prefix of the Primary KEY leveraging how CRDB stores the table. I think it should be faster than bucket_id >= 1 AND bucket_id < 2 on a secondary index.

//Move the message on timeline
UPDATE solutiona SET timestamp=newvalue;

The concern here is that we cannot add the timestamp in the primary key since is mutable.

Solution B

Probably the worst solution, same as A but we add the timestamp in the primary key and when we want to update it we Delete the old document and create a new one.

SELECT * FROM solutiona WHERE ID >= 000001_X AND ID < 0000002_Y //0001 is the bucketID and x/y are timestamps LIMIT 100;

Question: can we do a swap in update without traffic outside of Cockroachdb network? Deleting the old docs, add the same docs with the new timestamps in a transaction.
I know that there is not a support yet for stored procedures and I’m not sure how this can be done without one.

Solution C

I don’t know how good is CRDB at Updates so this is a immutable alternative with the reasoning to split the table in 2, do more operations but have a more optimal SELECT

timeline_table

  • ID (primary key, string/bytes, composed of “bucket_id_timeline_message_id”) - uses the ASC order of crdb primary keys
  • bucket_id
  • message_id //UNIQUE secondary index for fast lookups and join
  • timestamp

messages table:

  • message_id (primary_key)
  • other metadata (which can be large > 100kb)

most run query (get the oldest messages from a bucket)
SELECT * FROM timeline_table INNER JOIN messages-table ON message_id WHERE ID >= x_y AND ID < z_w LIMIT 100 (where X and Z are buckets and Y and W are timestamp values)

Move the message (now is a transaction):
DELETE FROM timeline_table WHERE message_id = 2; INSERT INTO timeline_table //with new timestamp ;

Thanks for reading such a large post!