visit
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "select * from base_region"
}
}
transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to //seatunnel.apache.org/docs/transform/sql
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
query = "insert into base_region(id,region_name) values(?,?)"
}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.confSingle Table to Multiple TablesSingle Table to Multiple Tables
env {
execution.parallelism = 2
job.mode = "BATCH"
}
source {
Jdbc {
url = "jdbc:mysql://127.0.0.1:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
result_table_name="t_user"
query = "select * from t_user;"
}
}
transform {
Sql {
source_table_name = "t_user"
result_table_name = "t_user_nan"
query = "select id,name,birth,gender from t_user where gender ='男';"
}
Sql {
source_table_name = "t_user"
result_table_name = "t_user_nv"
query = "select id,name,birth,gender from t_user where gender ='女';"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_nan"
query = "insert into t_user_nan(id,name,birth,gender) values(?,?,?,?)"
}
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/dw"
driver = "com.mysql.cj.jdbc.Driver"
connection_check_timeout_sec = 100
user = "user"
password = "password"
source_table_name = "t_user_nv"
query = "insert into t_user_nv(id,name,birth,gender) values(?,?,?,?)"
}
}
./bin/seatunnel.sh --config ./config/mysql2mysql_1n.conf
-- dw Source table 1
CREATE TABLE IF NOT EXISTS ads_device_switch_performance (
`event_time` timestamp COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) COMMENT 'Equipment Name',
`cpu_usage` INT COMMENT 'CPU Usage Rate'
) ;
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-15 14:25:11', '2001', '2', 'Exchanger 1', 49);
INSERT INTO `ads_device_switch_performance` VALUES ('2024-01-17 22:25:40', '2002', '1', 'Exchanger 2', 65);
-- dw Source table 2
CREATE TABLE IF NOT EXISTS ads_device_router_performance (
`event_time` timestamp COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) COMMENT 'Equipment Name',
`cpu_usage` INT COMMENT 'CPU Usage Rate'
);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-17 21:23:22', '1001', '1', 'Router 1', 35);
INSERT INTO `ads_device_router_performance` VALUES ('2024-01-16 17:23:53', '1002', '2', 'Router 2', 46);
-------------------------------------------------------------------------------
-- olap Target table
CREATE TABLE `device_performance` (
`id` INT NOT NULL AUTO_INCREMENT COMMENT 'Table Primary Key',
`event_time` VARCHAR(32) NOT NULL COMMENT 'Business Time',
`device_id` VARCHAR(32) COMMENT 'Equipment id',
`device_type` VARCHAR(32) COMMENT 'Equipment Type',
`device_name` VARCHAR(128) NOT NULL COMMENT 'Equipment Name',
`cpu_usage` FLOAT NOT NULL COMMENT 'CPU Usage Rate Unit %',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Update time',
PRIMARY KEY (`id`)
) COMMENT='Equipment status';
./bin/seatunnel.sh --config ./syn_job/mysql2mysql_n1_batch.conf
env {
job.mode="BATCH"
job.name="device_performance"
}
source {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="switch_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_switch_performance;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/dw?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
result_table_name="router_src"
query="SELECT `event_time`, `device_id`, `device_type`, `device_name`, `cpu_usage` FROM ads_device_router_performance;"
}
}
transform {
Sql {
source_table_name = "switch_src"
result_table_name = "switch_dst"
query = "SELECT event_time , device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM switch_src;"
}
Sql {
source_table_name = "router_src"
result_table_name = "router_dst"
query = "SELECT event_time, device_id, device_type, device_name, cpu_usage, NOW() AS create_time, NOW() AS update_time FROM router_src;"
}
}
sink {
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "switch_dst"
query="INSERT INTO device_performance_switch VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
Jdbc {
url="jdbc:mysql://127.0.0.1:3306/olap?allowMultiQueries=true&characterEncoding=utf-8"
driver="com.mysql.cj.jdbc.Driver"
user = "user"
password = "password"
source_table_name = "router_dst"
query="INSERT INTO device_performance_router VALUES(null,?, ?, ?, ?, ?, ?, ?) ;"
}
}