Wie kann ich 4700+ Indizes von einem Elasticsearch Cluster in ein neues Cluster migrieren? Üblicherweise würde für eine solche Migration die Cross Cluster Replikation benutzt. Was aber, wenn diese Funktion wegen einem veralteten Sourcecluster, nicht eingesetzt werden kann? Dann wird es Zeit für die “reindex” API und etwas Python.

Ausgangslage

Das alte 3 Node Cluster mit 60 GB Memory Version 6.2.2 soll migriert werden auf ein 5 Node Cluster Version 7.8.0. Im folgenden Text wird das alte Cluster als “Source”, das neue Cluster als “Target” bezeichnet.

Das Python Script für die Migration wird ausschliesslich auf einem Node des Targetclusters laufen gelassen. Deswegen werden die Zertifikate (siehe weiter unten) sowohl vom Source- als auch des Targetclusters auf dem Targetclusters gespeichert. Die Reindexierung findet ausschliesslich über das Targetcluster statt: das Targetcluster etabliert somit selber eine Verbindung zum Sourcecluster – dies ist von Elasticsearch implementiert.

Vorarbeiten

index Templates

Die Index Templates müssen auf der Target Seite angelegt werden.

jvm Einstellungen

Auf dem Targetcluster müssen die jvm Einstellungen im File /etc/elasticsearch/jvm.options überprüft werden. “-Xms” und “-Xmx” müssen auf den gleichen Wert gesetzt werden, als Faustregel für die Grüssen gilt: 50% des physischen Memories.

Zertifikate

Kommuniziert das Sourcecluster über SSL so muss das Cluster Zertifikat auf dem Target abgelegt werden. Ist das Zertifikat nicht verfügbar, so kann es auf dem Sourcecluster mittels openssl unter dem CA (.p12) generiert werden:

 openssl pkcs12 -in /etc/elasticsearch/certs/elastic-1-certificate.p12 \
  -cacerts -out /etc/elasticsearch/certs/elastic-1-certificate.pem

Das pem-Zertifikat auf dem Target Cluster speichern. Das pem-Zertifikat muss zugleich noch in ein crt-Zertifikat umgewandelt werden (für elasticsearch.yaml weiter unten). Einfach den Teil des “BEGIN” und “END” in einem File der Endung crt speichern. Gleicher Vorgang für das Targetcluster, das pem-File des Targetclusters wird ebenfalls auf dem Targetcluster abgespeichert.

Whitelisten des Sourceclusters

In /etc/elasticsearch/elasticsearch.yaml muss das Sourcecluster gewhitelistet werden:

reindex.remote.whitelist: "sourcecluster:9200, targetcluster:9200, 127.0.10.*:9200, localhost:*"

# In case of ssl enabled on the old cluster:
reindex.ssl.certificate_authorities: /etc/elasticsearch/certs/sourcecluster.crt

Maximale Anzahl Shards

Elasticsearch limitiert per Default pro Node 1000 Shards. Dieser Wert sollte im Normalfall nicht verändert werden, da er eine Überlastung des Clusters versucht zu verhindern. Bei einer Migration kann dieser Wert allerdings überschritten werden. In meinem Fall hatte das Targetcluster 5 Nodes, jedoch mussten wesentlich mehr Shards angelegt werden als 5000. So habe ich kurzfristig das Limit pro Node auf 2000 erhöht. Dazu wird der _cluster/settings Endpoint verwendet:

PUT /_cluster/settings
{
  "persistent": {
    "cluster.max_shards_per_node": 2000,
    "search": {
      "max_buckets": "35000"
    }
  },
  "transient": {}
}

Reindexing

Test

Mit den “Dev Tools”auf Kibana welches auf den Targetcluster zeigt, wird die Verbindung von Source- zu Targetcluster geprüft:

POST _reindex?wait_for_completion=false
{
  "source": {
    "remote": {
      "host": "https://sourcecluster:9200/",
      "username": "user",
      "password": "einpasswort"
    },
    "index": "testindex"
  },
  "dest": {
    "index": "testindex"
  }
}

Reindexing mit Python

4700+ Indizes sind etwas zu viel Handarbeit, also muss Python her. Folgende Tasks werden automatisiert:

  • etablieren einer Connection zu den Clustern
  • Analyse der Indizes des Sourceclusters mit search
  • Kopieren der Indizes mittels reindex
  • Analyse der Taskdokumente mittels Task

Etablieren einer Connection zu den Clustern

Der folgende Abschnitt beschreibt den Aufbau einer Verbindung zum Sourcecluster mit dem Resultat eines Elasticsearch Objektes “es”. Zum Targetcluster muss auf die gleiche Weise ein Elasticsearch Objekt generiert werden. Das “create_default_context” Objekt muss nur erstellt werden, wenn als Transport https verwendet wird.

from elasticsearch import Elasticsearch
from ssl import create_default_context

user = 'user'
password = 'einpasswort'
server = 'sourcecluster:9200'
pemFile = 'sourcecluster.pem'
transport = 'https'

context = create_default_context(cafile=pemFile)
es = Elasticsearch(
    [transport + '://' + user + ':' + password + '@' +      
     server + '/'], ssl_context=context )

Ist kein pem-Zertifikat vorhanden kann das es-Objekt erstellt werden mit verify_cert=false:

es = Elasticsearch(
     [transport + '://' + user + ':' + password + '@' + 
      server + '/'], verify_certs=False

Nachfolgend wird das Elasticsearch Objekt “esSource” für das Sourcecluster und “esTarget” für das Targetcluster heissen.

Analyse der Indizes des Sourceclusters mit search Klasse

Ich stellte fest, dass es auf dem Sourcecluster etliche Indizes gibt, welche leer sind, sprich keine Dokumente besitzen. Diese werden vom reindex API nicht kopiert. Ich beschloss diese mit “esSource” mit einem Loop zu identifizieren

for line in esSource.cat.indices().split("\n"):
  if len(line) > 0:
    print (line.split()[2] + ' : ' + 
           str(esSource.search(index=line.split()[2])['hits']['total']['value']))

Dabei ist esSource.search(index=line.split() der Indexname. Dieser Loop kann für das Kopieren mit reindex wiederverwendet werden.

Wichtig: Indizes die mit “.” beginnen , sind vom System und sollten ausgefiltert werden. Das Python Modul re ist aber dazu sind notwendig, stattdessen wird schon bei der Abfrage entsprechend gefiltert:

indices = esSource.cat.indices(index="*,-.*,-logstash*",
          h="index,creation.date,store.size",
          s="store.size")

Die obige Abfrage liefert den Indexnamen, die Grösse der Indizes und das Erstellungsdatum in Epoch Sekunden. Diese Spalten war insbesondere wichtig um:

  • die zu reindexierenden Daten zu gruppieren nach Grösse. Diese Gruppierung zeigte einen Performancegewinn, da jeweils Verarbeitungs-Pakete mit ähnlichen Grössen erstellt werden konnten
  • Durch die Epoch Sekunden konnten die 4700 Indizes in verschiedenen Tranchen kopiert werden.

Kopieren der Indizes mittels reindex Klasse

Der folgende Code zur Reindexisung wird nun ebenfalls im Loop durchlaufen. Wichtig: der Aufruf der reindex Klasse gibt die Task ID zurück. Diese ID wird im abschliessenden Schritt für die Kontrolle benötigt.

# dieses Snippet läuft üblicherweise in einer Methode 
# oder Funktion. Die Variable index_name wird in 
# einem äusseren Loop definiert (siehe Loop oben).

task_id = esTarget.reindex({
  "conflicts": "proceed",
  "source": {
    "remote": {
      "host": 'https://' + old_cluster_node,
      "username": 'user',
      "password": 'einpasswort'
      },
    "index": index_name
  },
  "dest": {
    "index": index_name
  }
}, wait_for_completion=False, request_timeout=300)

Analyse der Taskdokumente mittels task Klasse

Der Fortschritt der reindexing Tasks kann nun mit der task Klasse beobachtet werden. Dabei können alle Tasks in einem bestimmten Zeitrahmen überprüft werden:

all_tasks = esTarget.search(index=".tasks", body={
    "size": 2000,
    "_source": "task.id",
    "query": {
       "range": {
           "task.start_time_in_millis": {
           "gte": 1602224473470
           }
        }
     }
  })

for item in all_tasks['hits']['hits']:
  print(item['_id'])

Oder es kann jeder einzelne Task kontrolliert werden

for item in all_tasks['hits']['hits']:
  task = esTarget.tasks.get(item['_id'])
  print (task['completed'])
  print (task['task']['status']['total'])
  print (task['task']['description'])

Oder es wird gezielt nach Fehlern gesucht:

for item in task['response']['failures']:
  print(item['cause']['reason'])

Um eine bessere Übersicht zu bekommen, wie die Respüonse aussieht bei der Abfrage der Tasks, hier die json Struktur:

{
'completed': True,
'task':
   {'node': 'CxxxxxxxxxxxXXX_9og',
   'id': 56810410,
   'type': 'transport',
   'action': 'indices:data/write/reindex',
   'status':
      {'total': 37979456,
      'updated': 0,
      'created': 0,
      'deleted': 0,
      'batches': 1,
      'version_conflicts': 0,
      'noops': 0,
      'retries':
         {'bulk': 0,
         'search': 0},
      'throttled_millis': 0,
      'requests_per_second': -1.0,
      'throttled_until_millis': 0},
   'description': 'reindex from [scheme=https host=sourcecluster port=9200 pathPrefix=/ query={\n  "match_all" : {\n    "boost" : 1.0\n  }\n} username=user password=<<>>][testindex] to [testindex][_doc]',
   'start_time_in_millis': 1602224473470,
   'running_time_in_nanos': 554508622,
   'cancellable': True,
   'headers': {}
   },
'response':
   {'took': 551,
   'timed_out': False,
   'total': 37979456,
   'updated': 0,
   'created': 0,
   'deleted': 0,
   'batches': 1,
   'version_conflicts': 0,
   'noops': 0,
   'retries':
      {'bulk': 0,
      'search': 0},
   'throttled': '0s',
   'throttled_millis': 0,
   'requests_per_second': -1.0,
   'throttled_until': '0s',
   'throttled_until_millis': 0,
   'failures': [
      {'index': 'testindex',
      'type': '_doc',
      'id': '_9nuTHQBhzZfWNNXI0P1', 
      'cause': {
         'type': 'validation_exception', '
         reason': 'Validation Failed: 1: this action would add [2] total shards, but this cluster currently has [5000]/[5000] maximum shards open;'}, 
         'status': 400}
         }
      }
   }
}

Fazit

Die remote Reindexierung, also das indexweise Kopieren der Daten vom alten Cluster ins Neue verläuft problemlos. Wie immer steht und fällt eine solche Aktion, mit dem Bereingen der Originaldaten – also dem Löschen von leeren Indizes. Und die Nachkontrolle der Tasks ist wichtig:

  • gibt es im Array task[‘response’][‘failures’] Einträge?
  • Wurde der Index laut task[‘task’][‘status’][‘created’] angelegt?
  • gibt es laut task[‘task’][‘status’][‘version_conflicts’] Versionskonflikte?

All diese Validierungen müssen natürlich verscripted werden. Aussserdem sollte der Index “.task” bereinigt werden. Dazu kann folgendes Query abgesetzt werden:

POST .tasks/_delete_by_query
{
  "query": {
    "match": {
      "completed": "true"
    }
  }
}

Sollten Sie Fragen oder Anregungen haben, zögern Sie nicht mich zu kontaktieren.

Schreibe einen Kommentar