smartpip实现按ID增量抽取


在数据抽取过程中, 如何通过目标数据库最大ID或时间等条件进行抽取, 这样可以将数据同步的粒度做得更细

标准方法

在datax的数据同步设定中加入

##incColumn = 增量字段
##incDB = 目标查询DB[默认为starrocks]

数据集方式

  • 首先我们需在smartchart的数据集开发中新建一个查询sql 例如maxid, event_day对应的参数名
select 
 max(id) as maxid, max(event_day) as event_day 
from targettablename
  • 在DAG开发中将数据集ID当做参数传递即可,假设数据集ID为455
#datax  jobname   455   -- 同步表
  • 在抽取任务设定中, 查询SQL中将数据集的查询结果做为参数传递
select .... from xxxx
where id > $maxid  and event_day > '$event_day'

自定义方法

  • 也可以自定义函数的方式来处理数据集的数据

    def get_targettablename_id():
      result = get_dataset(455)['data']
      maxid = result[1][0]
      event_day = result[1][1]
      return {'maxid':maxid, 'event_day': event_day}
    
  • 在DAG开发中将函数当做参数传递即可

#datax  jobname   get_targettablename_id   -- 同步表