Data Tier Routing Patterns

Data Tier Routing Patterns define the deterministic mapping of index shards to specific node roles across their operational lifecycle. In production OpenSearch environments, routing is not a passive allocation heuristic; it is an actively enforced constraint matrix governed by Index State Management (ISM) phase transitions, shard allocation deciders, and Cross-Cluster Replication (CCR) topology boundaries. Misaligned routing manifests as unbalanced disk utilization, stalled ISM transitions, or CCR follower desynchronization. This guide operationalizes routing enforcement, provides exact configuration payloads, and delivers Python orchestration patterns for automated validation and remediation. For foundational routing architecture and cluster state propagation mechanics, reference the OpenSearch ISM Architecture & Fundamentals documentation before implementing tier-specific filters.

Allocation Constraint Matrix & Decider Logic

OpenSearch routes shards using explicit allocation filters evaluated at the index or index template level. The cluster state engine matches these filters against node attributes (node.attr.tier, node.attr.zone, node.attr.disk_type) before committing placement. Production deployments must prioritize index.routing.allocation.require for strict enforcement, reserving include and exclude for flexible, capacity-driven placement.

During tier transitions, routing filters must shift atomically to prevent split-brain allocation or unnecessary shard relocation. The FilterAllocationDecider evaluates constraints synchronously during cluster state updates. Routing failures typically occur when node attributes drift from declared values, or when index.routing.allocation.enable is temporarily restricted to primaries during maintenance windows. Understanding how these deciders interact with Hot-Warm-Cold Tier Design is critical for preventing allocation storms during peak ingestion windows.

flowchart TD
    A["Shard needs allocation"] --> B{"allocation.enable?"}
    B -- "disabled" --> X["Stay UNASSIGNED"]
    B -- "allowed" --> C{"Disk watermark OK?"}
    C -- "breached" --> X
    C -- "ok" --> D{"require filter matches node.attr?"}
    D -- "no" --> X
    D -- "yes" --> E{"include / exclude filters"}
    E -- "excluded" --> X
    E -- "permitted" --> F["Assign shard to node"]

ISM Policy Routing Configuration

ISM policies inject routing directives during phase transitions using the allocation action. This mechanism modifies index-level allocation settings without requiring manual PUT _settings calls, ensuring deterministic placement aligned with Index Lifecycle Basics. The following payload demonstrates a production-grade ISM policy with explicit tier routing and transition thresholds:

JSON
{
  "policy": {
    "description": "Production log ingestion routing policy",
    "default_state": "hot",
    "states": [
      {
        "name": "hot",
        "actions": [
          {
            "rollover": {
              "min_index_age": "1d",
              "min_primary_shard_size": "30gb"
            }
          }
        ],
        "transitions": [
          {
            "state_name": "warm",
            "conditions": {
              "min_index_age": "3d",
              "min_size": "50gb"
            }
          }
        ]
      },
      {
        "name": "warm",
        "actions": [
          {
            "allocation": {
              "require": {
                "data": "warm"
              }
            }
          },
          {
            "shrink": {
              "num_new_shards": 1
            }
          }
        ],
        "transitions": [
          {
            "state_name": "cold",
            "conditions": {
              "min_index_age": "14d"
            }
          }
        ]
      },
      {
        "name": "cold",
        "actions": [
          {
            "allocation": {
              "require": {
                "data": "cold"
              }
            }
          },
          {
            "force_merge": {
              "max_num_segments": 1
            }
          }
        ],
        "transitions": [
          {
            "state_name": "delete",
            "conditions": {
              "min_index_age": "90d"
            }
          }
        ]
      },
      {
        "name": "delete",
        "actions": [
          {
            "delete": {}
          }
        ]
      }
    ]
  }
}

The allocation action executes after the index enters the target state. To prevent transient misplacement during the transition window, pair allocation with shrink or force_merge actions, which inherently trigger shard relocation under controlled allocation constraints. For complete syntax validation and action ordering rules, consult the official OpenSearch ISM documentation.

Cross-Cluster Replication (CCR) Routing Boundaries

CCR introduces additional routing constraints because follower indices inherit allocation settings from the leader cluster. In multi-cluster deployments, routing patterns must account for network topology and storage tier parity. If a leader index routes to data: hot, the follower must either mirror that routing or explicitly override it via index.routing.allocation.require on the follower side.

Misconfigured CCR routing causes persistent UNASSIGNED shards when follower nodes lack matching attributes. To enforce deterministic placement in replicated environments:

  1. Define identical node.attr.tier labels across leader and follower clusters.
  2. Use index templates on the follower cluster to inject require filters before replication initializes.
  3. Avoid cross-tier replication where network latency exceeds acceptable thresholds for write acknowledgment.
  4. Monitor plugins.replication.follower.index.routing.allocation.require to ensure follower overrides do not conflict with leader metadata propagation.

Python Orchestration for Routing Validation

Manual routing audits scale poorly in dynamic environments. The following Python script uses opensearch-py to audit routing drift, validate ISM policy alignment, and auto-remediate misaligned shards. It queries cluster state, compares declared routing against actual shard placement, and applies corrective settings where drift is detected.

Python
import os
import logging
from opensearchpy import OpenSearch
from opensearchpy.exceptions import TransportError

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

class RoutingAuditor:
    def __init__(self, host: str, port: int = 9200, auth: tuple = None):
        self.client = OpenSearch(
            hosts=[{"host": host, "port": port}],
            http_auth=auth,
            use_ssl=True,
            verify_certs=True,
            timeout=30,
            max_retries=3,
            retry_on_timeout=True
        )

    def get_managed_indices(self) -> list[str]:
        """Retrieve indices actively managed by ISM."""
        try:
            response = self.client.transport.perform_request("GET", "/_plugins/_ism/policies")
            managed_indices = []
            for policy in response.get("policies", {}).values():
                managed_indices.extend(policy.get("indices", []))
            return managed_indices
        except TransportError as e:
            logger.error(f"Failed to fetch ISM policies: {e}")
            return []

    def audit_routing_drift(self, indices: list[str]) -> list[dict]:
        """Compare declared routing vs actual shard placement."""
        drift_report = []
        for idx in indices:
            try:
                settings = self.client.indices.get_settings(index=idx)
                routing_require = settings.get(idx, {}).get("settings", {}).get("index", {}).get("routing", {}).get("allocation", {}).get("require", {})

                if not routing_require:
                    continue

                shards = self.client.cluster.state(index=idx)["routing_table"]["indices"][idx]["shards"]
                for shard_id, shard_list in shards.items():
                    for shard in shard_list:
                        if shard.get("primary", False) and shard.get("state") == "STARTED":
                            node_id = shard.get("node")
                            node_info = self.client.nodes.info(node_id)
                            node_attrs = node_info["nodes"][node_id].get("attributes", {})

                            for key, expected_val in routing_require.items():
                                actual_val = node_attrs.get(key)
                                if actual_val != expected_val:
                                    drift_report.append({
                                        "index": idx,
                                        "shard": shard_id,
                                        "expected": {key: expected_val},
                                        "actual_node": node_id,
                                        "actual_attr": {key: actual_val}
                                    })
            except TransportError as e:
                logger.warning(f"Skipping {idx}: {e}")
        return drift_report

    def remediate_drift(self, drift_report: list[dict]) -> None:
        """Apply corrective routing settings to drifted indices."""
        corrected_indices = set()
        for entry in drift_report:
            idx = entry["index"]
            if idx not in corrected_indices:
                try:
                    self.client.indices.put_settings(
                        index=idx,
                        body={"index.routing.allocation.require": entry["expected"]}
                    )
                    logger.info(f"Applied routing correction to {idx}: {entry['expected']}")
                    corrected_indices.add(idx)
                except TransportError as e:
                    logger.error(f"Failed to remediate {idx}: {e}")

if __name__ == "__main__":
    OPENSEARCH_HOST = os.getenv("OPENSEARCH_HOST", "localhost")
    OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", "9200"))
    OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "admin")
    OPENSEARCH_PASS = os.getenv("OPENSEARCH_PASS", "admin")

    auditor = RoutingAuditor(
        host=OPENSEARCH_HOST,
        port=OPENSEARCH_PORT,
        auth=(OPENSEARCH_USER, OPENSEARCH_PASS)
    )

    managed = auditor.get_managed_indices()
    if not managed:
        logger.info("No ISM-managed indices found.")
        exit(0)

    drift = auditor.audit_routing_drift(managed)
    if drift:
        logger.warning(f"Detected {len(drift)} routing drift(s). Remediating...")
        auditor.remediate_drift(drift)
    else:
        logger.info("Routing alignment verified. No drift detected.")

The script leverages the official opensearch-py client with built-in retry logic, SSL verification, and transport-level request handling for version compatibility. For production deployment, integrate it into a scheduled automation pipeline, and reference the official Python client documentation for advanced connection pooling and async execution patterns.

Operational Safeguards & Transition Hygiene

Routing enforcement requires strict operational discipline. During rolling upgrades or node decommissioning, temporarily setting index.routing.allocation.enable to primaries prevents replica allocation storms but must be reverted promptly to maintain fault tolerance. Always validate node attribute consistency before scaling clusters; a single node missing node.attr.tier: warm will block all warm-tier shard placements.

To prevent cascading allocation failures:

  • Use index templates to enforce baseline require filters before indices are created.
  • Monitor cluster.routing.allocation.disk.watermark thresholds to ensure tier transitions align with available capacity.
  • Isolate CCR traffic on dedicated network interfaces to prevent routing evaluation latency from impacting primary cluster performance.
  • Implement automated drift detection to catch configuration divergence before it triggers shard relocation storms.

Deterministic Data Tier Routing Patterns eliminate guesswork in shard placement, ensuring predictable performance, cost-efficient storage utilization, and resilient replication topologies. By embedding routing constraints directly into ISM policies and automating drift detection, platform teams maintain operational control at scale.