1、下载与elasticsearch版本匹配的logstash;
2、准备jdbc包:
如:mysql-connector-java-8.0.18.jar,并放入logstash/logstash-core/lib/jars下,不然会报驱动找不到;
3、写配置文件,如下所示:
input {
stdin {
}
jdbc { ###数据源配置,单表只需一个jdbc块,多表就写多个
type => "memories" ###类型,用作输入时判断,并会写入elasticsearch中,所以从MySQL中查询的字段中如果有type可能会同步失败
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/memories"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/home/ubuntu/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar" ###jdbc驱动包的位置
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
lowercase_column_names => false ###是否自动将大写转为小写
statement_filepath => "/home/ubuntu/sql/memories.sql" ###sql语句的位置,或者直接用statement => "You SQL"
last_run_metadata_path => "/home/ubuntu/log/last_memories.txt" ###上次更新的时间点记录的位置
clean_run => false ### 是否自动删除更新时间点记录,增量同步时需要设为true
schedule => "* * * * *" ###同步计划时间
}
jdbc { ### 第二张表
type => "user"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/memories"
jdbc_user => "root"
jdbc_password => "root"
jdbc_driver_library => "/home/ubuntu/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
lowercase_column_names => false
statement_filepath => "/home/ubuntu/sql/user.sql"
last_run_metadata_path => "/home/ubuntu/log/last_user.txt"
clean_run => false
schedule => "* * * * *"
}
}
filter { ### 过滤器
json {
source => "message"
remove_field => ["message"]
}
}
output { ### 根据输入源中的type进行输出源的匹配
if [type] == "memories" {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
index => "memories" ### 索引名
document_id => "%{mem_id}" ### 文档ID,尽量保证其唯一性,否则可能同步失败
template_overwrite => true ### 是否使用ES模板
template => "/home/ubuntu/template/memories_new.json" ### ES模板位置
template_name => "memories" ### ES模板名,不要有重复
}
}
if [type] == "user" {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
index => "user"
document_id => "%{user_id}"
template_overwrite => true
template => "/home/ubuntu/template/user_new.json"
template_name => "user"
}
}
stdout { ### 将输入信息使用JSON格式输出到控制台
codec => json_lines
}
}
代码中的注释需要删掉,否则会不成功
- 全量同步时,SQL语句可以写为:
SELECT id mem_id, id, title, banner, user_id userId, UNIX_TIMESTAMP(create_time) createTime, UNIX_TIMESTAMP(update_time) updateTime, open_status openStatus, status, platform FROM memories
- 增量同步时,SQL语句可以写为:
SELECT id mem_id, id, title, banner, user_id userId, UNIX_TIMESTAMP(create_time) createTime, UNIX_TIMESTAMP(update_time) updateTime, open_status openStatus, status, platform FROM memories WHERE update_time >= :sql_last_value OR create_time >= :sql_last_value ORDER BY create_time asc, update_time ASC
tip:单独的mem_id用作ES中的文档ID,因为用ID可能会和其他的ID重复,导致同步失败。
- ES模板如下:
{
"index_patterns": "memories", ### 新版本大概7.x以后的该字段为index_patterns,之前为index,注释需删除
"version": 7040299,
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.refresh_interval": "5s"
},
"mappings": {
"properties": {
"banner": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"bannerUrl": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"createTime": {
"type": "long"
},
"id": {
"type": "long"
},
"m_id": {
"type": "long"
},
"openStatus": {
"type": "long"
},
"platform": {
"type": "long"
},
"status": {
"type": "long"
},
"title": {
"type": "text",
"analyzer":"ik_max_word", ### 配置ik分词器,注释需删除
"search_analyzer":"ik_smart"
},
"updateTime": {
"type": "long"
},
"userId": {
"type": "long"
}
}
}
}
- 模板是否正确可以使用ES的Rest接口或Kibana添加一下,命令为:
POST _template/memories
{
###模板内容
}
如果有错会报错误信息,模板错误也是导致数据同步失败的原因之一。
4、运行logstash
root@ubuntu:/home/ubuntu#./logstash/bin/logstash -f ./logstash.conf