页次: 1
##参考
https://help.aliyun.com/document_detail/170095.html
https://cloud.tencent.com/document/product/845/35568
----- es-to-es.config----
input{
elasticsearch{
# 源端ES地址。
hosts => ["http://192.166.174.17:9200"]
# 安全集群配置登录用户名密码。
#user => "xxxxxx"
#password => "xxxxxx"
# 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
#index => "kibana_sample_data_*"
index => "*"
# 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
docinfo=>true
slices => 5
size => 2000
}
}
filter {
# 去掉一些Logstash自己加的字段。
mutate {
remove_field => ["@timestamp", "@version"]
}
}
output{
elasticsearch{
# 目标端ES地址,可在阿里云Elasticsearch实例的基本信息页面获取。
hosts => ["http://192.166.174.15:9200"]
# 安全集群配置登录用户名密码。
#user => "elastic"
#password => "xxxxxx"
# 目标端索引名称,以下配置表示索引与源端保持一致。
index => "%{[@metadata][_index]}"
# 目标端索引type,以下配置表示索引类型与源端保持一致。
document_type => "%{[@metadata][_type]}"
# 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
document_id => "%{[@metadata][_id]}"
ilm_enabled => false
manage_template => false
}
}
--------------
nohup /bin/logstatsh -f es-to-es.config >/dev/null 2>&1 &
-------------
在进行数据迁移时,Logstash会帮助您自动创建索引,但是自动创建的索引可能与您待迁移的索引存在差异,导致迁移前后数据的格式不一致。因此建议您在数据迁移前,在阿里云Elasticsearch中手动创建目标索引,确保迁移前后索引数据完全一致。
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 文件名:indiceCreate.py
import sys
import base64
import time
import httplib
import json
## 源集群host。
oldClusterHost = "192.166.174.17:9200"
## 源集群用户名,可为空。
oldClusterUserName = "elastic"
## 源集群密码,可为空。
oldClusterPassword = ""
## 目标集群host,可在阿里云Elasticsearch实例的基本信息页面获取。
newClusterHost = "192.166.174.15:9200"
## 目标集群用户名。
newClusterUser = "elastic"
## 目标集群密码。
newClusterPassword = ""
DEFAULT_REPLICAS = 0
def httpRequest(method, host, endpoint, params="", username="", password=""):
conn = httplib.HTTPConnection(host)
headers = {}
if (username != "") :
'Hello {name}, your age is {age} !'.format(name = 'Tom', age = '20')
base64string = base64.encodestring('{username}:{password}'.format(username = username, password = password)).replace('\n', '')
headers["Authorization"] = "Basic %s" % base64string;
if "GET" == method:
headers["Content-Type"] = "application/x-www-form-urlencoded"
conn.request(method=method, url=endpoint, headers=headers)
else :
headers["Content-Type"] = "application/json"
conn.request(method=method, url=endpoint, body=params, headers=headers)
response = conn.getresponse()
res = response.read()
return res
def httpGet(host, endpoint, username="", password=""):
return httpRequest("GET", host, endpoint, "", username, password)
def httpPost(host, endpoint, params, username="", password=""):
return httpRequest("POST", host, endpoint, params, username, password)
def httpPut(host, endpoint, params, username="", password=""):
return httpRequest("PUT", host, endpoint, params, username, password)
def getIndices(host, username="", password=""):
endpoint = "/_cat/indices"
indicesResult = httpGet(oldClusterHost, endpoint, oldClusterUserName, oldClusterPassword)
indicesList = indicesResult.split("\n")
indexList = []
for indices in indicesList:
if (indices.find("open") > 0):
indexList.append(indices.split()[2])
return indexList
def getSettings(index, host, username="", password=""):
endpoint = "/" + index + "/_settings"
indexSettings = httpGet(host, endpoint, username, password)
print (index + " 原始settings如下:\n" + indexSettings)
settingsDict = json.loads(indexSettings)
## 分片数默认和源集群索引保持一致。
number_of_shards = settingsDict[index]["settings"]["index"]["number_of_shards"]
## 副本数默认为0。
number_of_replicas = DEFAULT_REPLICAS
newSetting = "\"settings\": {\"number_of_shards\": %s, \"number_of_replicas\": %s}" % (number_of_shards, number_of_replicas)
return newSetting
def getMapping(index, host, username="", password=""):
endpoint = "/" + index + "/_mapping"
indexMapping = httpGet(host, endpoint, username, password)
print (index + " 原始mapping如下:\n" + indexMapping)
mappingDict = json.loads(indexMapping)
mappings = json.dumps(mappingDict[index]["mappings"])
newMapping = "\"mappings\" : " + mappings
return newMapping
def createIndexStatement(oldIndexName):
settingStr = getSettings(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
mappingStr = getMapping(oldIndexName, oldClusterHost, oldClusterUserName, oldClusterPassword)
createstatement = "{\n" + str(settingStr) + ",\n" + str(mappingStr) + "\n}"
return createstatement
def createIndex(oldIndexName, newIndexName=""):
if (newIndexName == "") :
newIndexName = oldIndexName
createstatement = createIndexStatement(oldIndexName)
print ("新索引 " + newIndexName + " 的setting和mapping如下:\n" + createstatement)
endpoint = "/" + newIndexName
createResult = httpPut(newClusterHost, endpoint, createstatement, newClusterUser, newClusterPassword)
print ("新索引 " + newIndexName + " 创建结果:" + createResult)
## main
indexList = getIndices(oldClusterHost, oldClusterUserName, oldClusterPassword)
systemIndex = []
for index in indexList:
if (index.startswith(".")):
systemIndex.append(index)
else :
try :
createIndex(index, index)
except Exception as e :
print('Error:',e)
if (len(systemIndex) > 0) :
for index in systemIndex:
print (index + " 或许是系统索引,不会重新创建,如有需要,请单独处理~")
离线
页次: 1