公告

特别推出京东优惠挖掘小程序 [点击这里,扫码收藏] 专门收集京东今日特价爆品,商家漏洞等,拼手速,手慢无! 新增优惠: 1,美团外卖红包:扫码至少节省3元[点击收藏],全国可用,用完还能领。 2,车主加油打折服务:一键导航到加油站,选择油枪,支付时直减。 [点击查看] 3,电影票购买返利,覆盖所有主流院线。 [点击查看]

#1 2023-03-24 19:15:17

小天天
Moderator
注册时间: 2019-09-29
帖子: 886

通过Logstash 迁移es

##参考
https://help.aliyun.com/document_detail/170095.html
https://cloud.tencent.com/document/product/845/35568

----- es-to-es.config----
input{
    elasticsearch{
        # 源端ES地址。
        hosts =>  ["http://192.166.174.17:9200"]
        # 安全集群配置登录用户名密码。
        #user => "xxxxxx"
        #password => "xxxxxx"
        # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
        #index => "kibana_sample_data_*"
        index => "*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
        docinfo=>true
        slices => 5
        size => 2000
    }
}

filter {
  # 去掉一些Logstash自己加的字段。
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
        hosts => ["http://192.166.174.15:9200"]
        # 安全集群配置登录用户名密码。
        #user => "elastic"
        #password => "xxxxxx"
        # 目标端索引名称,以下配置表示索引与源端保持一致。
        index => "%{[@metadata][_index]}"
        # 目标端索引type,以下配置表示索引类型与源端保持一致。
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
}
--------------
nohup /bin/logstatsh -f es-to-es.config  >/dev/null 2>&1 &




-------------
在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。

#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 源集群host。
oldClusterHost = "192.166.174.17:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = ""
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "192.166.174.15:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = ""
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
    conn = httplib.HTTPConnection(host)
    headers = {}
    if (username != "") :
        'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
        base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
        headers["Authorization"] = "Basic %s" % base64string;
    if "GET" == method:
        headers["Content-Type"] = "application/x-www-form-urlencoded"
        conn.request(method=method, url=endpoint, headers=headers)
    else :
        headers["Content-Type"] = "application/json"
        conn.request(method=method, url=endpoint, body=params, headers=headers)
    response = conn.getresponse()
    res = response.read()
    return res
def httpGet(host, endpoint, username="", password=""):
    return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
    return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
    return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
    endpoint = "/_cat/indices"
    indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
    indicesList = indicesResult.split("\n")
    indexList = []
    for indices in indicesList:
        if (indices.find("open") > 0):
            indexList.append(indices.split()[2])
    return indexList
def getSettings(index, host, username="", password=""):
    endpoint = "/" + index + "/_settings"
    indexSettings = httpGet(host, endpoint, username, password)
    print (index + "  原始settings如下:\n" + indexSettings)
    settingsDict = json.loads(indexSettings)
    ## 分片数默认和源集群索引保持一致。
    number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
    ## 副本数默认为0。
    number_of_replicas = DEFAULT_REPLICAS
    newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
    return newSetting
def getMapping(index, host, username="", password=""):
    endpoint = "/" + index + "/_mapping"
    indexMapping = httpGet(host, endpoint, username, password)
    print (index + " 原始mapping如下:\n" + indexMapping)
    mappingDict = json.loads(indexMapping)
    mappings = json.dumps(mappingDict[index]["mappings"])
    newMapping = "\"mappings\" : " + mappings
    return newMapping
def createIndexStatement(oldIndexName):
    settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
    createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
    return createstatement
def createIndex(oldIndexName, newIndexName=""):
    if (newIndexName == "") :
        newIndexName = oldIndexName
    createstatement = createIndexStatement(oldIndexName)
    print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
    endpoint = "/" + newIndexName
    createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
    print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
    if (index.startswith(".")):
        systemIndex.append(index)
    else :
        try :
        createIndex(index, index)
    except Exception as e :
        print('Error:',e)

if (len(systemIndex) > 0) :
    for index in systemIndex:
        print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")

离线

页脚

Powered by 华新企财帮

京ICP备19031397号-1