Brainstorm: Saving state for Shards Allocator


I’m in the initial phase of research towards writing a new ShardAllocator.

This blog post summarizes quite nicely how the current BalancedShardsAllocator works, and this blog post goes in details to its major drawbacks, ending with a need to take several signals into account.
TL;DR It only balances the cluster based on number of shards per node / per index, not taking many other signals such as: Ingestion, Read pressure, Disk Space. This leads to skewed disk usage, write hot spots and read spots. The work-arounds are quite hard and manual and can bring lots of fatigue to an operations team.

Quick shoutout: if someone from Amazon has continued to work on this, I would gladly join the effort.

My general idea is to write a ShardsAllocator which will try to balance the shards on the cluster based on signals mentioned above.

One of the areas I’m trying to think of now is where to keep the state, so it can also live across restarts.
Ingestion rate: Save the 75 percentile ingestion rate per shard, for last day, or if it’s Data Streams, same number per stream of last rolled over index.
Rate Pressure: Save a counter of bytes read from shard, counter of CPU millis spent executing queries. I would save that counter each hour, per shard.

One option is to save that into a internally used index, a bit similar to what Kafka does when it saves Consumer Offsets into consumer_offset topics, or HBase where it has a Regions table holding the location of each region and it’s start key. Is this something viable? Are there something like that in OpenSearch?

Another option is ClusterState, but it’s a bit of data, and I don’t really need to disperse that data to all nodes in the cluster - the only node who really needs it is the Master type node who runs the ShardsAllocator

Would love to hear you thoughts on the topic.

1 Like

Hey @asafmesika - sounds interesting. Let me see if I can find the relevant people who have worked on that in the past.

1 Like

Hi @asafmesika,

Nice to hear that you’re looking at improving shard allocation. While I’m not actively working on Shard Allocator these days, I’ve spent considerable time working in this space, and I’m happy to brainstorm.

I think both the options you presented are viable; I was considering the same two routes for persisting data. This data can grow very quickly, and you may soon realize that you want to add more signals. Which is why, I’m a little skeptical about the cluster state approach. Opensearch will distribute the delta in cluster state with every leader-follower check, and observability data like this will inevitably have some diffs to propagate. There is a non-trivial cost to sharing this data across nodes in a large cluster (which interestingly, is also where these optimized shard allocators would shine).

I like the idea of storing it in a local index. I think, this is already done for some plugins, maybe alerting and index management related?, (someone in the forum can confirm). We can weigh the pros and cons of each as you design this in more detail.

While we’re on the the topic, there are a few other things we should think about…

  1. Using actual resource footprint signals instead of proxy signals - Things like actual JVM, CPU, RAM, heap and network consumption for a shard may be strong indicators of the right node to place it.
  2. How frequently do we capture these signals and how? I think there is an existing NodeStats mechanism that could be leveraged…
  3. Seeding new shards with initial values of these signals - we could use exponential moving averages to seed initial weights values for shards.
  4. How frequently and when do we change allocations or rebalance? Rebalancing is a disruptive operation, it requires moving data around and replaying the translog, while keeping writes in progress. This can take a toll on your cluster.
  5. To help with (3), people generally use throttling. Opensearch also has throttling limits on concurrent recoveries. But things get interesting when we need throttling to play game with our overall allocation plan…
    • Say our algo realizes that it needs to move 10 shards around to different nodes. But throttling only lets us move 2 at a time. How will those decisions change after we’ve moved a subset of shards - we move 4 shards and the signals then suggest a different plan.
    • Do we go with a greedy approach? Or do we execute the full initial plan for those 10 shards?
    • Where do we store the plan for those 10 shards, to can come back to it as more shards open up to move based on throttling limits?
  6. It is incredibly useful to have a deterministic algo in shard allocation - to be able to work backwards and see why shards were placed where they were, or look at the state of the system and know which shards will move next. Something to consider while designing this.
    • Also, it would help to add some commands, APIs, or tools that help visualize past and future allocation/rebalancing decisions.
  7. Ensure there is no flip-flopping - we don’t want to land in a place where shards keep bouncing from one node to another.

Finally, the algo. should be able to say when it is best to scale the cluster, instead of attempting to rebalance shards. And if that scaling should be vertical or horizontal.

All of these are hard and interesting aspects of this problem. I’d like to hear more on this from the community…


Any luck locating this people @searchymcsearchface ?

I wanted to just pipe in with the solution our team came up with for this problem years ago, and that we’ve open-sourced it. GitHub - datto/es-disk-rebalance
The basic gist is: This is a python script which we run on a schedule. We run it once daily. It augments the function of the native shard balancer, which, as you point out, is only trying to make the shard count even, and is not taking volume usage into consideration.
The script evaluates the distribution of shards, and data volume usage of the nodes. It come up with a plan to swap large shards with small shards, and then executes relocations in both directions - swapping large shards on fuller nodes with small shards on emptier nodes.

So, the native balancer is always keeping the number of shards even across nodes, and this script is swapping the smallest and largest shards around to maintain the even shard count while approaching a more balanced volume usage across nodes.

We’ve been running this for years and it works great on our comically giant 2.5PB cluster, which is hosting hundreds of different index families with a wide variety of shard sizes. Anywhere from megabytes to tens of gigabytes. It achieves a balance across the cluster within 1-2%.

@asafmesika yes - @vigyasharma is your man for this.