Oceanus practice - develop MySQL CDC to es SQL jobs from 0 to 1

Wuyuntao 2022-06-24 05:43:26 阅读数:270

oceanuspracticedevelopmysqlcdc

Real time is the future , Recently in Tencent cloud Oceanus Real time computing services , The following is a mysql To flink To ES practice . Share with you ~

1. Environment building

1.1 establish Oceanus colony

stay Oceanus Console 【 Cluster management 】->【 New cluster 】 Page create cluster , Choose the region 、 Availability zone 、VPC、 journal 、 Storage , Set the initial password, etc .

If not used before VPC, journal , Store these components , You need to create it first .

VPC And subnets need to be connected with the following Mysql、ES Clusters use the same , Otherwise, you need to get through manually ( Such as peer-to-peer connection ).

The created cluster is as follows :

oceanus colony

1.2 establish Mysql colony

On Tencent cloud homepage 【 product 】->【 database 】->【 Cloud database MySQL】 Page purchase Mysql colony .

stay MySQL Console Find the created MySQL colony , stay 【 Database management 】->【 Parameter setting 】 Modify the following parameters on the page :

 binlog_row_image=FULL
mysql Parameter configuration

1.3 stay mysql Create tables in the database :

The implementation is as follows sql, Or create tables through visual pages .

 -- Take the student transcript as an example
CREATE TABLE `cdc_source4es` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT ' Student number ',
`score` int(11) NOT NULL COMMENT ' fraction ',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO\_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='create for student score'

1.4 establish Elastic Search colony

On Tencent cloud homepage 【 product 】->【 big data 】->【ElasticSearch】 Page purchase ES colony , And here for simplicity , Chose and Oceanus The same area , The same as the zone . The network selection is the same as above VPC.

This time we created 1 individual ES6 Version of cluster , adopt ES Console see , The created cluster is as follows :

ES colony

After creation, you can use Kibana see ES Cluster information . If in Dev Tools Execute the following commands on the panel :

# View the cluster nodes
GET _cat/nodes
# The returned node information is normal
172.28.1.1 43 99 1 0.06 0.06 0.12 dilm - 1627027760001130832
172.28.1.2 65 99 3 0.03 0.12 0.13 dilm - 1627027760001130732
172.28.1.3 29 99 3 0.08 0.08 0.12 dilm * 1627027760001130632

notes :ES There is no need to create a table like entity in advance .

thus , The environment is ready .

2. Job creation

2.1 establish SQL Homework

stay Oceanus Console 【 Job management 】->【 New job 】-> SQL Homework , Select the cluster creation job just created . Then at the end of the assignment 【 Development and debugging 】->【 Operation parameters 】 Add necessary connector, Such as mysql-cdc connector、elasticsearch6/7 connector.

notes :es connector The version to be purchased ES Consistent component version .

SQL Homework

2.2 establish Source End

Choose... Here mysql As a data source , And continuously update the subsequent data to ES in .

-- mysql-cdc connector
CREATE TABLE `mysql_source` (
`id` int,
`score` int,
PRIMARY KEY (`id`) NOT ENFORCED -- If the database table to be synchronized has a primary key defined , Then you also need to define
) WITH (
'connector' = 'mysql-cdc', -- It has to be for 'mysql-cdc'
'hostname' = '172.28.28.213', -- Database IP
'port' = '3306', -- The access port of the database
'username' = 'youruser', -- User name for database access ( Need to provide SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD jurisdiction )
'password' = 'yourpassword', -- Password for database access
'database-name' = 'test', -- Databases that need to be synchronized
'table-name' = 'cdc_source4es' -- The name of the data table that needs to be synchronized
);

2.3 establish Sink End

here sink No need to ES Initialize the cluster in advance , Data can be written directly .

-- Be careful ! If you enable Elasticsearch User name and password authentication function , At present, you can only use Flink 1.10 The old grammar of . If authentication is not required , You can use Flink 1.11 New syntax .
-- See https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector
CREATE TABLE es_old_sink (
`id` INT,
`score` INT
) WITH (
'connector.type' = 'elasticsearch', -- Output to Elasticsearch
'connector.version' = '6', -- Appoint Elasticsearch Version of , for example '6', '7'. Pay attention to the necessary and selected built-in Connector Versions,
'connector.hosts' = 'http://172.28.1.175:9200', -- Elasticsearch The connection address of
'connector.index' = 'connector-test-index', -- Elasticsearch Of Index name
'connector.document-type' = '_doc', -- Elasticsearch Of Document type
'connector.username' = 'elastic', -- Optional parameters : Elasticsearch user name
'connector.password' = 'yourpassword', -- Optional parameters : Elasticsearch password
'update-mode' = 'upsert', -- Optional without primary key 'append' Pattern , Or with a primary key 'upsert' Pattern
'connector.key-delimiter' = '$', -- Optional parameters , The concatenation character of the composite primary key ( The default is _ Symbol , for example key1_key2_key3)
'connector.key-null-literal' = 'n/a', -- The primary key is null Substitute string for , The default is 'null'
'connector.failure-handler' = 'retry-rejected', -- Optional error handling . Can choose 'fail' ( Throw an exception )、'ignore'( Ignore any errors )、'retry-rejected'( retry )
'connector.flush-on-checkpoint' = 'true', -- Optional parameters , Batch writes are not allowed during snapshot (flush), The default is true
'connector.bulk-flush.max-actions' = '42', -- Optional parameters , Maximum number of pieces per batch
'connector.bulk-flush.max-size' = '42 mb', -- Optional parameters , Cumulative maximum size per batch ( Only support mb)
'connector.bulk-flush.interval' = '60000', -- Optional parameters , Interval between batch writes (ms)
'connector.connection-max-retry-timeout' = '1000', -- Maximum timeout per request (ms)
--'connector.connection-path-prefix' = '/v1' -- Optional fields , The path prefix appended to each request
'format.type' = 'json' -- Output data format , Currently only supported 'json'
);

2.4 Operator operation

Only simple data insertion is done here , No complicated calculations are made .

INSERT INTO es_old_sink select id, score from mysql_source;

3. Validation summary

stay Kibana Of Dev Tools Query in ES Data in , Whether the data is inserted successfully .

# Query all the data under the index
GET connector-test-index/_search
copyright:author[Wuyuntao],Please bring the original link to reprint, thank you. https://en.javamana.com/2022/175/20210804203431095p.html