PostgreSQL 数据同步到ES 搭建操作

安装python 和dev 开发包

[root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm
准备中...       ################################# [100%]
正在升级/安装...
 1:python-devel-2.7.5-58.el7  ################################# [100%]
[root@rtm2 Packages]# ls

安装 multicorn

[root@rtm2 multicorn-1.3.5]# make
Python version is 2.7
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/errors.o src/errors.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/python.o src/python.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/query.o src/query.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -I/usr/include/python2.7 -I/usr/include/python2.7 -I. -I./ -I/opt/pgsql-10/include/server -I/opt/pgsql-10/include/internal -D_GNU_SOURCE -c -o src/multicorn.o src/multicorn.c
gcc -Wall -Wmissing-prototypes -Wpointer-arith -Wdeclaration-after-statement -Wendif-labels -Wmissing-format-attribute -Wformat-security -fno-strict-aliasing -fwrapv -fexcess-precision=standard -O2 -fPIC -shared -o multicorn.so src/errors.o src/python.o src/query.o src/multicorn.o -L/opt/pgsql-10/lib -Wl,--as-needed -Wl,-rpath,'/opt/pgsql-10/lib',--enable-new-dtags -lpthread -ldl -lutil -lm -lpython2.7 -lpthread -ldl -lutil -lm -lpython2.7 -Xlinker -export-dynamic
.//preflight-check.sh
cp sql/multicorn.sql sql/multicorn--1.3.5.sql
[root@rtm2 multicorn-1.3.5]# make install
Python version is 2.7
...

安装pg-es-fdw-master

[root@rtm2 multicorn-1.3.5]# cd ../pg-es-fdw-master
[root@rtm2 pg-es-fdw-master]# ls
demo.sh dite LICENSE README.md setup.py
[root@rtm2 pg-es-fdw-master]# python setup.py build
running build
running build_py
creating build
creating build/lib
creating build/lib/dite
copying dite/__init__.py -> build/lib/dite
[root@rtm2 pg-es-fdw-master]# python setup.py install
running install
running bdist_egg
running egg_info
creating dite.egg-info
writing dite.egg-info/PKG-INFO

安装插件 multicorn

[postgres@rtm2 ~]$ psql
psql (10.3)
Type "help" for help.
postgres=# select * from pg_extension;
 extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
---------+----------+--------------+----------------+------------+-----------+--------------
 plpgsql |  10 |   11 | f    | 1.0  |   |
(1 row)
postgres=# CREATE EXTENSION multicorn;
CREATE EXTENSION
postgres=# psql
postgres=# select * from pg_extension;
 extname | extowner | extnamespace | extrelocatable | extversion | extconfig | extcondition
-----------+----------+--------------+----------------+------------+-----------+--------------
 plpgsql |  10 |   11 | f    | 1.0  |   |
 multicorn |  10 |   2200 | t    | 1.3.5  |   |
(2 rows)
postgres=# CREATE SERVER multicorn_es FOREIGN DATA WRAPPER multicorn OPTIONS(wrapper 'dite.ElasticsearchFDW');
CREATE SERVER
postgres=#

es

[root@rtm2 config]# vi elasticsearch.yml
node.name: "es-node1"
network.host: 192.168.31.121
discovery.zen.ping.unicast.hosts: ["192.168.31.121"]
[root@rtm2 config]# vi /etc/sysctl.conf
vm.max_map_count=262144
sysctl -p
[root@rtm2 config]# vi /etc/security/limits.conf
# End of file
 root soft nofile 65536
root hard nofile 65536
root soft nproc 4096
root hard nproc 4096
~

启动es

[root@rtm2 bin]# ls
elasticsearch  elasticsearch.in.bat elasticsearch-service-mgr.exe elasticsearch-service-x86.exe plugin.bat
elasticsearch.bat elasticsearch.in.sh elasticsearch-service-x64.exe plugin       service.bat
[root@rtm2 bin]# ./bin/elasticsearch
test=# CREATE FOREIGN TABLE pp_es (id bigint,age bigint) SERVER multicorn_es OPTIONS (host
test(# '192.168.31.121', port '9200', node 'es-node1', index 'pp');
CREATE FOREIGN TABLE
test=#

创建触发器和外部表

test=# CREATE OR REPLACE FUNCTION index_pp() RETURNS trigger AS $def$
test$# BEGIN
test$# INSERT INTO pp_es (id, age) VALUES
test$# (NEW.id, NEW.age);
test$# RETURN NEW;
test$# END;
test$# $def$ LANGUAGE plpgsql;
CREATE FUNCTION
test=# CREATE TRIGGER es_insert_pp AFTER INSERT ON pp FOR EACH ROW EXECUTE PROCEDURE index_pp();
CREATE TRIGGER
test=#

新增数据测试

test=# insert into pp (id,age) values (1,11);
INSERT 0 1
test=# select * from pp;
 id | age
----+-----
 1 | 11
(1 row)
test=#

检查es数据

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
{
 "took" : 104,
 "timed_out" : false,
 "_shards" : {
 "total" : 5,
 "successful" : 5,
 "failed" : 0
 },
 "hits" : {
 "total" : 2,
 "max_score" : 1.0,
 "hits" : [ {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "1",
  "_score" : 1.0,
  "_source":{"age": "11"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "2",
  "_score" : 1.0,
  "_source":{"age": "22"}
 } ]
 }
}
[root@rtm2 ~]#

创建更新触发器

test=# CREATE OR REPLACE FUNCTION updadeIndex_pp() RETURNS trigger AS $def$
BEGIN
UPDATE pp_es SET
id = NEW.id,
age = NEW.age
where id =NEW.id;
RETURN NEW;
END;
$def$ LANGUAGE plpgsql;
CREATE FUNCTION
test=# ^C
test=#
test=# CREATE TRIGGER es_update_pp AFTER UPDATE OF id, age ON pp FOR EACH ROW WHEN (OLD.* IS DISTINCT
test(# FROM NEW.*)EXECUTE PROCEDURE updadeIndex_pp();
CREATE TRIGGER
test=#

更新表数据

test=# select * from pp;
 id | age
----+-----
 1 | 11
 2 | 22
 3 | 22
(3 rows)
test=# update pp a set a.age = 33 where a.id = 3;
ERROR: column "a" of relation "pp" does not exist
LINE 1: update pp a set a.age = 33 where a.id = 3;
      ^
test=# update pp set age = 33 where id = 3;
UPDATE 1
test=# select * from pp;
 id | age
----+-----
 1 | 11
 2 | 22
 3 | 33
(3 rows)
test=#

es查询变更

[root@rtm2 ~]# curl 'http://192.168.31.121:9200/es-node1/_search?q=*:*&pretty'
{
 "took" : 4,
 "timed_out" : false,
 "_shards" : {
 "total" : 5,
 "successful" : 5,
 "failed" : 0
 },
 "hits" : {
 "total" : 3,
 "max_score" : 1.0,
 "hits" : [ {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "1",
  "_score" : 1.0,
  "_source":{"age": "11"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "2",
  "_score" : 1.0,
  "_source":{"age": "22"}
 }, {
  "_index" : "es-node1",
  "_type" : "pp",
  "_id" : "3",
  "_score" : 1.0,
  "_source":{"age": "33"}
 } ]
 }
}
[root@rtm2 ~]# 

补充:logstash同步pgsql数据到Elasticsearch

一、对于logstash的配置我就不在多说,主要是三部分,input、filter、output的配置

二、配置步骤

1、input配置

input {
 stdin {
 }
 jdbc {
  jdbc_connection_string => "jdbc:postgresql://127.0.0.1:5432/world"
  jdbc_user => "postgres"
  jdbc_password => "zhang123"
  jdbc_driver_library => "D:\logstash-6.4.0\bin\pgsql\postgresql-42.2.5.jar"
  jdbc_driver_class => "org.postgresql.Driver"
  jdbc_paging_enabled => "true"
  jdbc_page_size => "300000"
  use_column_value => "true"
  tracking_column => "id"
  statement_filepath => "D:\logstash-6.4.0\bin\pgsql\jdbc.sql"
 schedule => "* * * * *"
 type => "jdbc"
 jdbc_default_timezone =>"Asia/Shanghai"
 }
}

2、filter配置

filter {
 json {
  source => "message"
  remove_field => ["message"]
 }
}

3、output 配置,就是elasticsearch的基本配置

output {
 elasticsearch {
  hosts => ["localhost:9200"]
  index => "test_out"
 template => "D:\logstash-6.4.0\bin\pgsql\es-template.json"
 template_name => "t-statistic-out-logstash"
 template_overwrite => true
 document_type => "out"
  document_id => "%{id}"
 }
 stdout {
  codec => json_lines
 }
}

以上就是整个logstash 的jdbc.conf

4、es-template.json的配置

{
 "template" : "t-statistis-out-template",
 "order":1,
 "settings": {
   "index": {
    "refresh_interval": "5s"
   }
  },
 "mappings": {
   "_default_": {
 "_all" : {"enabled":false},
    "dynamic_templates": [
     {
    "message_field" : {
    "match" : "message",
    "match_mapping_type" : "string",
    "mapping" : { "type" : "string", "index" : "not_analyzed" }
    }
   }, {
    "string_fields" : {
    "match" : "*",
    "match_mapping_type" : "string",
    "mapping" : { "type" : "string", "index" : "not_analyzed" }
    }
   }
    ],
    "properties": {
     "@timestamp": {
      "type": "date"
     },
     "@version": {
      "type": "keyword"
     },
  "id": {
      "type": "keyword"
     },
  "name": {
      "type": "keyword"
     },
  "pp": {
      "type": "keyword"
     }
    }
   }
  },
  "aliases": {}

}

最后就是就是下载好pgsql的连接驱动,这个官网可以下载;配置好自己的数据库表格的数据

启动命令:进入到logstash的bin目录下,自己的logstash配置都是放在bin的pgsql这个目录下面(这个自己随意创建位置都可以)

logstash.bat -f ./pgsql/jdbc.conf

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。如有错误或未考虑完全的地方,望不吝赐教。

(0)

相关推荐

  • PostgreSQL upsert(插入更新)数据的操作详解

    本文介绍如何使用PostgreSQL upsert特性插入或当被插入数据已存在则更新数据. 1. 介绍PostgreSQL upsert 在关系型数据库中,upsert是一个组合词,即当往表中插入记录,如果该记录已存在则更新,否则插入新记录.为了使用该特性需要使用INSERT ON CONFLICT语句: INSERT INTO table_name(column_list) VALUES(value_list) ON CONFLICT target action; 该语法中target可以是下

  • 使用Postgresql 实现快速插入测试数据

    1.创建常规的企业信息表 create table t_centerprises( objectid bigint not null, /*唯一编号(6位行政区号+6位sn)*/ divid uuid not null, /*行政区唯一代码*/ name text not null, /*企业名称*/ address text not null, /*企业地址*/ post text, /*企业邮编*/ contacts text, /*联系人*/ tel text, /*联系电话*/ fax

  • PostgreSql 导入导出sql文件格式的表数据实例

    PostgreSql默认导出的文件格式是.backup,而我们很多数据库导入数据库脚本时是.sql文件格式的,PostgreSql作为国内的新潮,会不会不支持导出.sql文件格式吗?答案是当然不会.下面我们借助Pg Admin III工具来导出.sql的脚本的. 一.导出 首先,你当然得有一张完整的数据表啦,并且保证里面是有数据的: 有了表和数据之后,选中该表: 选中右键 --> 备份 我们可以看到文件名默认以.backup格式的方式进行数据备份的. 首先将自定义格式 --> 无格式 点开之后

  • postgresql 实现取出分组中最大的几条数据

    看代码吧~ WITH Name AS ( SELECT * FROM ( SELECT xzqdm, SUBSTRING (zldwdm, 1, 9) xzdm, COUNT (*) sl FROM sddltb_qc WHERE xzqdm IN ('130432', '210604') GROUP BY xzqdm, SUBSTRING (zldwdm, 1, 9) ) AS A ORDER BY xzqdm, xzdm, sl ) SELECT xzqdm, xzdm, sl FROM (

  • PostgreSQL用户、数据库及表的管理、操作与授权方式

    摘要 PostgreSQL的常用命令 1.登录数据库 /* 切换到数据库用户 */ su - postgres /* 登录 */ psql 登录成功显示如下: bash-4.2$ psql psql (9.3.17) Type "help" for help. postgres=> 2.切换数据库 /* 登录指定数据库 */ psql -U user -d dbname /* 列举数据库 */ \l /* 切换数据库 */ \c dbname 3.用户管理 /* 创建用户 */

  • 解决postgreSql远程连接数据库超时的问题

    首先在cmd中ping 这个ip如果发现可以ping通就可以考虑是 远程数据库开启了防火墙.或者数据库设置该ip不能访问. 防火墙问题:可以考虑直接关闭防火墙,或者设置防火墙开放5432端口 然后到postgresql安装目录下data中修改pg_hba.conf文件,配置用户的访问权限,拉到底部 host all all 127.0.0.1/32 trust host all all 192.168.1.0/24 md5 #表示允许网段192.168.1.0上的所有主机使用所有合法的数据库用户

  • postgresql限制某个用户仅连接某一个数据库的操作

    创建数据库bbb且owner为用户b: postgres9.6@[local]:5432 postgres# create database bbb owner b; CREATE DATABASE Time: 259.099 ms 默认情况下使用用户c也可以连接数据库bbb: postgres9.6@[local]:5432 postgres# \c bbb c You are now connected to database "bbb" as user "c"

  • PostgreSQL 数据同步到ES 搭建操作

    安装python 和dev 开发包 [root@rtm2 Packages]# rpm -ivh python-devel-2.7.5-58.el7.x86_64.rpm 准备中... ################################# [100%] 正在升级/安装... 1:python-devel-2.7.5-58.el7 ################################# [100%] [root@rtm2 Packages]# ls 安装 multicor

  • 详解Mysql如何实现数据同步到Elasticsearch

    目录 一.同步原理 二.logstash-input-jdbc 三.go-mysql-elasticsearch 四.elasticsearch-jdbc 五.logstash-input-jdbc实现同步 六.go-mysql-elasticsearch实现同步 七.elasticsearch-jdbc实现同步 一.同步原理 基于Mysql的binlog日志订阅:binlog日志是Mysql用来记录数据实时的变化 Mysql数据同步到ES中分为两种,分别是全量同步和增量同步 全量同步表示第一次

  • Python操作ES的方式及与Mysql数据同步过程示例

    目录 Python操作Elasticsearch的两种方式 mysql和Elasticsearch同步数据 haystack的使用 Redis补充 Python操作Elasticsearch的两种方式 # 官方提供的:Elasticsearch # pip install elasticsearch # GUI:pyhon能做图形化界面编程吗? -Tkinter -pyqt # 使用(查询是重点) # pip3 install elasticsearch https://github.com/e

  • MySQL 到Oracle 实时数据同步

    目录 第一步:配置MySQL 连接 第二步:配置 Oracle连接 第四步:进行数据校验 其他数据库的同步操作 摘要:很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.本文亲测了一种方式,可以非常方便地完成 MySQL 数据实时同步到Oracle,跟大家分享一下,希望对你有帮助. 本次 MySQL 数据实时同步到 Oracle大概只花了几分钟就完成.使用的工具是Tapdata Cloud ,这个工具是永久免费

  • MySQL 到 ClickHouse 实时数据同步实操

    摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,可以非常方便地完成 MySQL 数据实时同步到ClickHouse,跟大家分享一下,希望对你有帮助. MySQL 到 ClickHouse 实时数据同步实操分享 本次 MySQL 数据实时同步到ClickHouse大概只花了几分钟就完成.使用的工具是Tapdata Cloud ,这个工具是永久免费的. @[TOC](MySQL 到

  • MySQL 到Oracle 实时数据同步

    目录 第一步:配置MySQL 连接 第二步:配置 Oracle连接 第四步:进行数据校验 其他数据库的同步操作 摘要:很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.本文亲测了一种方式,可以非常方便地完成 MySQL 数据实时同步到Oracle,跟大家分享一下,希望对你有帮助. 本次 MySQL 数据实时同步到 Oracle大概只花了几分钟就完成.使用的工具是Tapdata Cloud ,这个工具是永久免费

  • linux下实现web数据同步的四种方式(性能比较)

    实现web数据同步的四种方式 ======================================= 1.nfs实现web数据共享2.rsync +inotify实现web数据同步3.rsync+sersync更快更节约资源实现web数据同步4.unison+inotify实现web数据双向同步 ======================================= 一.nfs实现web数据共享 nfs能实现数据同步是通过NAS(网络附加存储),在服务器上共享一个文件,且服务器需

  • 用python简单实现mysql数据同步到ElasticSearch的教程

    之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个 思路: 网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是

  • 使用mongoshake实现mongodb数据同步的操作方法

    目录 前言 使用场景 搭建步骤 环境准备 一.搭建mongodb复制集 二.mongoshake配置 前言 MongoShake是阿里云以Golang语言编写的通用平台型服务工具,它通过读取MongoDB的Oplog操作日志来复制MongoDB的数据以实现特定需求. MongoShake还提供了日志数据的订阅和消费功能,可通过SDK.Kafka.MetaQ等方式的灵活对接,适用于日志订阅.数据中心同步.Cache异步淘汰等场景. 官方地址:https://github.com/alibaba/M

  • MySQL高性能实现Canal数据同步神器

    目录 简介 配置MySQL centos7 安装 canal java客户端 简介 Canal是阿里巴巴基于Java开源的数据同步工具.平时业务场景使用比较多的如下: 同步数据到ES.Redis缓存中. 数据同步. 业务需要监听数据. 图片来源阿里巴巴github仓库:https://github.com/alibaba/canal 配置MySQL 跟配置主从复制类似. 数据执行如下命令: -- 给canal单独创建用户 用户名:canal 密码:123456 create user 'cana

随机推荐