本文共 2590 字,大约阅读时间需要 8 分钟。
在数据分析领域,Elasticsearch 是一个强大的选择,而 MySQL 作为传统的关系型数据库,虽然在数据持久化方面表现出色,但在数据分析方面稍显不足。为了弥补这一不足,我们可以使用 Logstash 来将 MySQL 数据实时同步到 Elasticsearch 中。
首先,我们需要下载并安装 Logstash。可以通过以下命令轻松获取:
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zipunzip logstash-6.2.3.zipcd logstash-6.2.3
接下来,安装必要的插件:
bin/logstash-plugin install logstash-input-jdbcbin/logstash-plugin install logstash-output-elasticsearch
同时,我们还需要下载 MySQL 的 JDBC 驱动:
wget https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-5.1.46.zipunzip mysql-connector-java-5.1.46.zip
logstash-input-jdbc
)使用 logstash-input-jdbc
插件读取 MySQL 数据。该插件通过定时执行 SQL 语句来获取数据,支持增量同步。我们需要指定以下参数:
input { jdbc { jdbc_driver_library: /path/to/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar jdbc_driver_class: com.mysql.jdbc.Driver jdbc_connection_string: jdbc:mysql://:3306/rta jdbc_user: jdbc_password: schedule: "* * * * *" statement: "SELECT * FROM table WHERE update_time >= :sql_last_value" use_column_value: true tracking_column_type: "timestamp" tracking_column: "update_time" last_run_metadata_path: "syncpoint_table" }}
配置说明:
jdbc_driver_library
:MySQL JDBC 驱动路径。jdbc_driver_class
:驱动类路径。jdbc_connection_string
:MySQL 连接地址。jdbc_user
:数据库用户名。jdbc_password
:数据库密码。schedule
:定时执行 SQL 的调度规则。statement
:执行的 SQL 语句,:sql_last_value
是内置变量。use_column_value
:启用递增列同步。tracking_column_type
:递增字段类型(建议使用 timestamp
)。tracking_column
:递增字段名称。last_run_metadata_path
:记录上次同步的文件路径。logstash-output-elasticsearch
)将数据同步到 Elasticsearch:
output { elasticsearch { hosts: ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"] user:password: index: "table" document_id: "%{id}" }}
配置说明:
hosts
:Elasticsearch 集群地址。user
:Elasticsearch 用户名。password
:Elasticsearch 密码。index
:目标索引名称。document_id
:文档 ID,建议设置为 MySQL 表的主键。将上述配置保存为 sync_table.cfg
,然后执行以下命令:
cd logstash-6.2.3bin/logstash -f config/sync_table.cfg
要同步多个表,可以创建多个配置文件,并在 config/pipelines.yml
中配置:
- pipeline.id: table1 path.config: "config/sync_table1.cfg"- pipeline.id: table2 path.config: "config/sync_table2.cfg"
运行 bin/logstash
即可启动多表同步。
默认情况下,@timestamp
字段为当前时间。如需使用数据中的 timeslice
字段指定时间,可以添加以下过滤器:
filter { date { match => [ "timeslice", "yyyyMMddHHmm" ] timezone => "Asia/Shanghai" }}
将此部分添加到 sync_table.cfg
中,确保 @timestamp
与 timeslice
一致。
转载地址:http://ecufk.baihongyu.com/