建表语句Doris:
建表语句MySQL:
数据库配置:
构建JdbcTemplate和DataSourceTransactionManager:
Servuce层使用@Transactional管理事务:
写入层:
测试结果:
建表语句Doris:
建表语句MySQL:
数据库配置:
构建JdbcTemplate和DataSourceTransactionManager:
Servuce层使用@Transactional管理事务:
写入层:
测试结果:
这个问题是因为doris和mysql对autocommit的行为不一致:https://dev.mysql.com/doc/refman/8.4/en/innodb-autocommit-commit-rollback.html
mysql关闭autocommit,需要手动commit或rollback;
doris没支持autocommit,显式事务需要发起begin命令才能开启。
涉及到的代码:
建表语句:
CREATE TABLE `dw`.`test_cuda` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`value` BIGINT NOT NULL,
`time` int
) UNIQUE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (
"replication_allocation" = "tag.location.default: 3"
);
CREATE TABLE `dw`.`test_cuda` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`value` BIGINT NOT NULL,
`time` int,
PRIMARY KEY (id)
);
配置文件:
spring:
db:
driver: com.mysql.jdbc.Driver
url: "jdbc:mysql://192.168.28.38:19030,192.168.28.250:19030,192.168.21.13:19030/dw?useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true"
#url: "jdbc:mysql://192.168.21.17:3306/dw?useUnicode=true&characterEncoding=utf8&useTimezone=true&serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true"
http-url: "http://192.168.28.38:18030/api/dw/%s/_stream_load"
username: root
password: ***
jdbc:
import com.alibaba.druid.pool.DruidDataSourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import java.util.*;
@Component
public final class MySqlConnection {// 拒绝继承
public static final Logger logger = LoggerFactory.getLogger(MySqlConnection.class);
@Value("${spring.db.driver}")
private String driver;
@Value("${spring.db.url}")
private String url;
@Value("${spring.db.username}")
private String username;
@Value("${spring.db.password}")
private String password;
@Value("${spring.db.http-url}")
private String httpUrl;
public DataSource dataSource;
private JdbcTemplate jdbcTemplate;
private DataSourceTransactionManager dataSourceTransactionManager;
@PostConstruct //在构造函数执行完之后执行
private final void getDataSource(){
Properties properties = new Properties();
properties.put("driverClassName",driver);
properties.put("url",url);
properties.put("username",username);
properties.put("password",password);
properties.put("maxActive","60");
properties.put("initialSize","2");
properties.put("maxWait","120000");
properties.put("testOnBorrow","false");
properties.put("testOnReturn","false");
properties.put("minEvictableIdleTimeMillis","300000");
properties.put("timeBetweenEvictionRunsMillis","60000");
try {
dataSource = DruidDataSourceFactory.createDataSource(properties);
jdbcTemplate = new JdbcTemplate(dataSource);
dataSourceTransactionManager = new DataSourceTransactionManager(dataSource);
} catch (Exception e) {
logger.error("error info: ",e);
}
}
@Bean
public JdbcTemplate getJdbcTemplate(){
return jdbcTemplate;
}
@Bean
public DataSourceTransactionManager getDataSourceTransactionManager(){
return dataSourceTransactionManager;
}
}
Service层:
import dw.data.api.analyse.dao.ExeTasDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
@EnableScheduling
@Transactional(rollbackFor = Exception.class)
public class ExeTaskService {
private Logger logger = LoggerFactory.getLogger(ExeTaskService.class);
int a = 0;
int b = 1000000;
@Resource
ExeTasDao exeTasDao;
@Scheduled(cron ="*/5 * * * * ?")
public void exe() throws Exception {
Integer v = new Long(System.currentTimeMillis()/1000).intValue();
a++;
exeTasDao.add(a,v);
if(a>5 && a<10){
throw new Exception("遇到7,8,9错误制造回滚...");
}
a++;
exeTasDao.add(a,v);
}
@Scheduled(cron ="*/30 * * * * ?")
public void exe2(){
Integer v = new Long(System.currentTimeMillis()/1000).intValue();
b++;
exeTasDao.add(b,v);
}
}
dao层
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import javax.annotation.Resource;
@Repository
public class ExeTasDao {
private Logger logger = LoggerFactory.getLogger(ExeTasDao.class);
@Resource
JdbcTemplate jdbcTemplate;
public void add(int a, Integer v) {
logger.info("a={} v={} ", a, v);
String sql2 = "insert into test_cuda(`value`,`time`) values (?,?)";
Object[] args2 = {a,v};
jdbcTemplate.update(sql2,args2);
}
}
pom文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>dw.data.doris</groupId>
<artifactId>dw-data-doris</artifactId>
<packaging>jar</packaging>
<version>1.0.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.16</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-freemarker</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.3.25</version>
</dependency>
<!-- slf4j start-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
<!-- slf4j end-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.32</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.41</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.13</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.101.Final</version>
</dependency>
</dependencies>
</project>