现实中的数据接入

Xiao Qiang Lv4

作为干了多年应用开发,只是在项目上看看数据项目的我,现在已经开始转向了数据的方向,目前来看许多企业已经完成了信息化的转型,但是应用系统中产生的数据并没有被好好的利用起来,那么在我的认知里,数据化的转型应该是未来的一个长期方向。其中包括了数据的采集,清理和使用,这里我说的很简单,因为在公司里面很多的数据都是被集中化管理,几乎不存在我拿不到数据的情况。这是站在一个公司层面来理解和观察数据,与社会的公共数据级别来说就如同九牛一毛一样。其实不仅仅是数据级别,治理难度,收集难度也没有在一个层级上。最近我在做一些数据收集的工作,所以也有一些想法想法想写写。主要是因为这次的体验比较特殊,现实中接数可能更多的是库对库或者推流的模式,然而当前的项目确是以api为全部数据的接入途径。不仅数量多,而且来源也不同。下面主要从client的调用、数据落库、数据清理方案以及数据的抽数方案等方面来展开讨论。
业务背景为某智慧平台建设,除了需要内部系统数据之外,还需要收集一些公共数据来做预测和实时的监控。然而这些公共数据都是以api的形式来提供数据,因为购买的数据来自多家供应商,都是国内头部的地图厂商。来自多家厂商还不是最大的问题,提供的接口只是为了提供数据,所以数据返回数据的内容各不相同。其中最普遍的是 application/json,我天真的以为,这次的数据就是为了调用api之后简单的转换落库就完了。结果还有各种不同的 content-typeresponse的返回。以下是应对两种不同的场景的请求实现:

Client调用方案

getZipFile(String path)

  • 核心功能: 下载并尝试解压 GZIP 压缩的文件,同时兼容普通文本。
  • 应对场景: 服务器返回 GZIP 压缩的响应体:这是它的主要目标,response的头部中能够发现 Content-Encoding的值为 zip。服务器为了节省带宽,可能会将 JSON 或文本数据进行 GZIP 压缩后再发送。客户端收到后需要先解压才能读取原始内容。API 明确告知会返回压缩文件:例如,某个接口的文档里写着响应体是 GZIP 压缩的 JSON或者,那么就应该用这个方法来调用。
  • 实现关键点:
1
2
3
4
5
6
7
8
9
10
HTTP 响应
├── URL: http://et-api.amap.com/state/areaJSONPub
├── Request Headers:
│ └── Accept-Encoding: gzip? (你没发,但可能默认支持)
├── Response Headers:
│ ├── Content-Type: application/x-zip-compressed ← 名不副实,传回来的数据并不是一个真正的zip文件,不需要通过GZIP解压之后再用ZIP解压
│ └── Content-Encoding: gzip
└── 响应体:
└── GZIP 压缩的数据
└── 解压后是:一个叫 areaJSONPub 的 JSON 文本文件(纯字符串)
  • 简单来说: 当你调用一个接口,它返回的是 .gz 或 .zip 格式的文件,而不是纯文本时,用这个方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected String getZipFile(String path) throws IOException {  
ResponseEntity<byte[]> entity = restTemplate.getForEntity(path, byte[].class);
byte[] body = entity.getBody();
// 所有流都从 body 创建,避免复用已读取的流
ByteArrayInputStream bais2 = new ByteArrayInputStream(body);
try (GZIPInputStream gis = new GZIPInputStream(bais2)) {
return streamToString(gis);
}
}

private static String streamToString(InputStream is) throws IOException {
StringBuilder sb = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
String line;
while ((line = reader.readLine()) != null) {
sb.append(line);
}
}
return sb.toString();
}

那如果数据量再大一些该怎么办?这个时候我们可能就需要借助一些大数据处理的思想以及框架来处理数据

  • case1,调用一个接口来处理一个T+3的数据,接口提供的数据不是普通的 appliaction/json格式而是一个zip文件的下载,处理方式有有两种
    • 方式一,使用java程序将文件下载下来,将其解压,通过hdfs的put命令,将解压后的文件存储到hdfs中,优点是可以通过代码的方式来操作这些文件,通过代码来实现自动化;缺点是如果这样的case很多,就是在代码中出现很多的长连接,这样会极大的占用程序的IO资源,容易造成性能问题
    • 方式二,使用bash脚本,通过wget和unzip命令来下载文件,然后直接在linux系统上执行hdfs的put命令,然后再通过corn tab表达式执行T+3的定时调度。优点是简单,能够直接使用linux的资源,没有其他的网络带宽,方式比较直接;缺点是这种方式只能作为一种临时的方式提取数据的方案,代码不易维护,没有版本的管理等
    • 方式三,通过all in one的平台,编写调度任务
      除了接口数据量大的问题,还有落数据的问题以及数据清理的问题

大数据量批量落数方案

通过使用框架的批量写数方案,能够在满足数据库的性能内,将一批数据写入数据,这样能够减少服务和数据连接数,和网络的占用。以下是主流的几种批量写数方案:

  • 使用jakarta.persistence提供的EntityManager,主动按照不同的批次执行
1
2
entityManager.flush();  %% 强制提交当前的上下文中的sql到数据中执行 %%
entityManager.clear();
  • 使用jdbctemplate的 jdbcTemplate.batchUpdate,这里有一点需要注意。如果没有添加 @Transactional,每一个batch都会自动提交。但是如果添加了注解,需要手动修改类型为 REQUIRES_NEW
  • mybatis的batchInsert接口

大数据量的清理方案

在落库的过程中,会产生大量的数据,为了不影响落数的速度,在建表的时候并没有添加索引。那么这将导致在数据量大了之后,很难查询对应的数据,也同时会影响对数据库修改的一些操作。所以产生的数据需要及时的清理,那么这里有几种清理方案,如下

主动处理

数据处理方需要提供一个api来查询每次数据处理的状态,如果当前批次的数据处理成功,在每次数据清理之前调用当前接口获取数据处理的状态。每次重试3次,如果三次都失败,需要打印日志,或者将当前没有处理的数据放到某个未处理队列中,以供后续的处理。

被动处理

数据消费方需要提供webhook,在每次数据处理完成之后通过webhook来调用数据存储方来清理数据。

涉及到的技术方案

  • 通过使用jdbctemplate执行原始的sql批量删数语句删数
  • 使用存储过程删数,定义存储过程模版,再用jdbctemplate调用存储过程
  • 在存数的时候就需要考虑,按日期分表存储数据,每次只需要drop table即可

超级大数据的清理

  • 因为在删除数据的时候会对数据的索引产生间隙锁,导致插入数据的时候发生插入语句被删除的block,那么这里有两种处理方案

    • 减少每次删除的数量在sql中添加一个 limit,这样可以减少间隙锁的命中,通过下述两个sql来查询sql的运行状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    show processList;

    SELECT
    waiting_trx_id,
    waiting_pid,
    waiting_query,
    blocking_trx_id,
    blocking_pid,
    blocking_query
    FROM sys.innodb_lock_waits;
    • 借助一个开关,使用内存或者Redis添加一个flag,当删除任务或者etl任务开始的时候,当前的写入数据的任务将切换一个临时表来写数,当长时间的任务结束之后再继续在原表中写数

ETL方式

调度数据接口数量比较多,调度的频率也会不一样,并且数据接口返回的数据量也不一样。所以在选择框架上可以从以下几个方面来考虑

框架 是否需要编码 是否能够支持多线程 维护性 扩展性 可视化界面 适应场景 相关官网
spring自带Schedule 可以的,默认使用的是 ConcurrentTaskExecutor,可以通过bean的方式替换Executor,来开启不同的线程池 需要通过写代码来设计 需要代码设计 1、需要调度的任务较少 <br>2、一定需要编程解决的数据采集任务
Quartz Scheduler 是的,还可以支持分布式任务调度,所有任务的metadata可以存储在数据库或者内存中。前者,持久化,不会因为服务的宕机而导致数据的丢失;后者因为存储在内存中,获取任务的metadata的速度会更快 需要设计代码 需要通过代码来设计 1、可以随时的开停正在运行的任务,不用额外设计【通过api】 https://www.quartz-scheduler.org/
xxljob 是的,还可以部署集群来提高任务的执行效率 不需要 不需要 是否能够支持复杂的任务? https://gitee.com/xuxueli0323/xxl-job
powerjob 是的,还可以部署集群来提高任务的执行效率 不需要 不需要 支持更加复杂的工作流DAG https://www.yuque.com/powerjob/guidence/intro

总结

在目前的接数阶段中,主要遇到的是一些性能问题,配置问题等。如同公司的同事所说,通过api接数在现实中是比较少的,因为通过网络的传输会影响接数的效率。数据经过网络的传输都会有一定的延迟,通常一般都是使用库表、文件以及消息队列的方式来共享数据。通过这次项目,我发现在现实中不同的api的接口的性能不同,数据的接收方式不同,数据之间还存在一定的依赖的关系。对于提到的这些问题需要设计接数方案的时候在以下几个方面进行考虑:

  • 外界系统能够接受的QPS,如果太多将会影响系统的调用外部接口的成功次数,超过limit将会限流
  • 当前系统能够承载的最大的线程数量,如果超过当前系统的最大的承载数量将有可能会导致系统的IO堵塞
  • 设计方案,DAG+调度框架代码【Schedular或者quartz】,简单数据采集直接使用可视化界面的方式采集数据,复杂数据或者有依赖的数据使用代码的形式来实现数据的采集
  • 接口采集的频率以及次数,这些数据可以用来统计每天数据采集量,影响每天数据的采集配额。同时数据量的大小也会影响当前临时存储空间的大小
  • 数据的增量字段,该字段可以是自增id,也可以是create_at字段,这个数据有助于后续的ETL操作,否则ETL阶段的时候将不会有抽数的标记
  • 合理的设计索引,索引过多,或者过少都会影响插入数据或者写入数据的速度
  • 及时的清理数据,如果数据量一旦过大就会引起后续ETL抽数失效
  • 数据量过大时,数据的存储可以考虑直接存储在大数据集群,这里可能就是抽象的数据的入湖【数据湖】
  • Title: 现实中的数据接入
  • Author: Xiao Qiang
  • Created at : 2025-08-10 15:26:26
  • Updated at : 2025-08-11 08:47:31
  • Link: http://fdslk.github.io/tech/big-data/etl/java/2025/08/10/现实中的数据接入/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments