分类目录归档:数据管道

smartpip API数据接入


定义接入标准function

假设API的返回格式为:
{'data':[{'a':1,'b':2},..]}
def _get_api_data(param):
    import json, requests
    param = json.loads(param)
    res = requests.post(url=url, json=param).json()['data']
    res = json.dumps(res).encode()
    return res

在api设定中

##apiConn=get_api_data
##param={ "p1": xxx,"

Read more

向量数据库


也许你最近可能听过这样的新闻,某向量数据库的初创公司刚写好 PPT,就获得了几千万的投资,某公司的开源的向量数据库因其代码的简陋而登上了 Hackernews 等等。在过去几个月时间中, AI 应用的发展如火如荼,带动了 AI 应用技术栈上下游的火爆,而向量数据库就是其中最热门的之一。

GPT 的缺陷

过去几个月的时间,我们正处于人工智能的革命中,其中最耀眼的莫过于 GPT-3.5/4 的横空出世,而 GPT-3.5/4 带给我们无限震撼的同时,其天然的缺陷和诸多的限制也让开发者头痛不已,例如其输入端上下文(tokens)大小的限制困扰着很多的开发者和消费者,像 gpt-3.5-turbo 模

Read more

smartpip任务中实现获取执行参数


airflow支持基于时间的任务回跑, 这样我们就需要使用到airflow自带的参数, 如 "execution_date"

airflow能向任务传递的参数如下:

{'conf': , 'dag': , 'dag_run':, 
'ds': '2022-07-13',
'ds_nodash': '20220713', 
'execution_date': DateTime(2022, 7, 13, 2, 4, 33, 244294, tzinfo=Timezone('+00:00')),
'inlets': [], 
'macros': ,
'next_ds': '2022-07-13',

Read more

SQL数据分析常用函数


SQL 有很多可用于计数和计算的内建函数。常用的函数有聚合函数、日期和时间函数、转换函数、窗口函数、字符串函数等。

聚合函数

名称 功能 备注
AVG 平均值
COUNT 非空值的个数
FIRST 第一个记录的值
LAST 最后一个记录的值
MAX() 最大值
MIN 最小值
SUM 总和

日期函数

名称 功能 备注
CURDATE 当前日期
CURTIME 当前时间
NOW 当前日期和时间
UNIX_TIMESTAMP 返回 UNIX 时间戳
DATE_ADD 将两个日期相加
DATE_FORMA

Read more

smartpip实现按ID增量抽取


在数据抽取过程中, 如何通过目标数据库最大ID或时间等条件进行抽取, 这样可以将数据同步的粒度做得更细

标准方法

在datax的数据同步设定中加入

##incColumn = 增量字段
##incDB = 目标查询DB[默认为starrocks]

数据集方式

  • 首先我们需在smartchart的数据集开发中新建一个查询sql 例如maxid, event_day对应的参数名
select 
 max(id) as maxid, max(event_day) as event_day 
from targettablename
  • 在DAG开发中将数据集ID当做参数传递即可,假设数据集ID为

Read more

smartpip实现自定义邮件


你可能需要自定义邮件内容, 甚至可能需要动态获取数据进行发送

固定内容发送

msg = ['报表刷新成功', '<h1>刷新成功!!</h1>']
maillist = 'xxx@smartchart.cn'

#send_mail reportmail msg maillist

动态获取数据发送

maillist = 'xxx@smartchart.cn,abc@xxx.cn'
def fun_msg():
    result = get_dataset(123)['data']
  

Read more

smartpip实现循环抽取任务


比如数据库中存在一些表, 这些表名后缀是按天命名的, 现在需要自动抽取汇总到同一个表中, 这样就需要用到循环抽取 smartpip中已有datax组件, 但只能进行单一表格抽取, 不能增加逻辑 所以我们需要使用到smartpip的diy组件功能 首先我们需要新一个datax抽取任务,在这个任务中我们传递参数ZYM, 比如:

#datax job1  ZYM

之后再新一建一个diy任务, 实现循环抽取:

ZYM = '202001'
def fun_job2():
    job = os.path.join(ETL_FILE_PATH , '项目名/job1.sql')
    report

Read more

为什么建设一下传统的数据中台那么贵


为什么建设一下传统的数据中台那么贵, 我给大家分析一下:

  • 比如建设一个基于Hadoop生态的大数据平台, 你可能会选择CDH, 商业版本大概是7W一台服务器每年, 至少要6台服务器吧, 一年投入差不多要40W
  • 你还需要一个BI工具, 一般商业BI工具一年30-50W左右
  • 要做数据开发, 总得要数据开发平台, 任务调度平台这些吧, 一般价格在20-30W左右 然后基本上线开始做BI了, 这时还算不上数据中台, 也是就是个大数据仓库
  • 后来你会发现BI太慢了, 这个时候又需要购买OLAP加速引擎, 比如Kyligence, 一年50W左右
  • 有了数据, 总得提供数据服务吧, 还得采购低代码的数据服务

Read more

smartpip使用方法sample


一个完整的sample

//获取变量,以下为python语法(option)
P_DAYS = 12
MSG = '------------------------------------------------------------'
report_time = datetime.datetime.now()- datetime.timedelta(days=int(P_DAYS))
P_START_ZYM= report_time.strftime('%Y%m')

//以下为JOB专署语法
#link  lastdag 30 3600        --

Read more

smartpip不常用驱动使用方法


sqoop
填写以下内容到你的SQL文件, 比如命名为: mysqoop.sql
/*
conn =  zspl        -- 连接串, 找管理员要, 也可自定义, (必填)
sourceTable = tablename   -- 源系统表名,(必填)
columns =             -- 抽取原表的字段  a,b,c (可省略)
where =                -- 抽取时的条件  a>1 (可省略)
seq =                    -- 分隔符, 默认 \t (可省略)
query =                -- 查询语名,

Read more