forked from didi/dlflow
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathFeatureFlow.conf
More file actions
169 lines (166 loc) · 5.93 KB
/
Copy pathFeatureFlow.conf
File metadata and controls
169 lines (166 loc) · 5.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// HOCON (Human-Optimized Config Object Notation)
// https://github.com/lightbend/config/blob/master/HOCON.md
# 全局配置项,用于做default配置,如果设置 override=true 会覆盖子项的配置
settings: {
# 是否用全局配置覆盖同名的子配置,false的情况退化为default配置
enableOverride: false,
# 是否开启checkpoint功能,开启会存储到HDFS
checkpoint: {
enable: true
overwrite: false, # 强制覆盖已有的checkpoint
clear: true, # 运行结束后是否清除checkpoint
emptyCheck: true, # 存入checkpoint的文件是否做非空校验,若空会抛出异常
},
parallelism = 1000 # dataframe执行时的并行度,请根据数据量设置合适的值
}
# 所有上游依赖的数据源,支持降级,用于生成占位符的kv字段,后面用作 substitution
dependencies: {
# hive类型的依赖,对依赖的hive表地址进行检测后,最终是以 "日期字符串" 格式返回(格式由 formatter 决定)
hive: [
{
key: HIVE_COMPACT_DATE # 该
formatter: yyyyMMdd
offsetDays: -1, # 是否在特征日期基础上进行日级别的偏移,正数是往特征日期后便宜,负数是往前便宜
degradeDays: 3, # 是否需要降级,最多降级多少天?必须 >= 0
# 数据源检查的地址模板,会根据执行时刻的:特征日期 + offsetDays,确定真实的日期,并对地址模板进行替换
pathTemplate: "/YOUR_HDFS_PREFIX/warehouse.db/hive/YOUR_TABLE/${YYYY}/${MM}/${DD}",
# 用哪种checker检查上游,可选项同 settings 部分
checker: {
method: capacity,
threshold: 1000.1 # 单位是MB
}
}
],
# hdfs类型的依赖,最终是以 pathTemplate 按检查通过的日期替换后的地址字符串
hdfs: [
{
key: UPSTREAM_PARQUET
offsetDays: 0,
degradeDays: 3,
pathTemplate: "/YOUR_HDFS_PREFIX/warehouse.db/hive/YOUR_TABLE/${YYYY}/${MM}/${DD}"
checker: {
method: _SUCCESS
}
}
],
# date类型的依赖,最终是以 "日期字符串" 格式返回(格式由 formatter 决定)
date: [
{
key: YESTERDAY,
formatter: yyyyMMdd,
offsetDays: -1
}
],
# 自定定义的 K-V pair
static: [
{
key: AGE,
value: "20"
}
]
}
# 具体的特征处理流程配置
steps: [
{
name: "loadParquet",
desc: "直接读取parquet地址的特征做合并",
# 目前支持4种OP(具体说明详见每一周):UDF_Assembler, SQL_Assembler, SQL_Assembler, MissingValueCounter
# UDF_Assembler,通过继承接口自定义的特征提取器。执行UDF代码提取完特征后,按指定的 joinKeys 合并到原数据上
opType: UDF_Assembler,
# 该 Step 是否设置checkpoint,若设置会覆盖 settings 中的默认checkpoint配置
checkpoint: {
enable: false
},
params: {
# 合并特征时候基于的主键,执行的是2个 dataframe 的 left_join,主键需要在2个里面都存在
joinKeys: [uid],
# 若要自己定义UDF,请实现接口: com.didi.dm.dmflow.feature.extractor.IFeatureUDF
clazz: com.didi.dm.dmflow.feature.extractor.udf.HdfsReader,
# optional, 用于判定提取到的特征是否存在重复,会根据 primaryKeys 判重
uniqueCheck: true,
primaryKeys: [
uid
],
# 这里是个Map[String, Any],具体字段的约束请继承:com.didi.dm.dmflow.feature.flow.param.BaseMapParams
extraParams: {
path: "${UPSTREAM_PARQUET}",
format: parquet,
},
# 这里对应 dependencies 部分的 key,将上游解析出的 value 传入 extraParams 中做占位符替换
substitutions: [UPSTREAM_PARQUET]
}
},
{
name: "loadSQL",
desc: "执行SQL文件获取特征进行合并",
# SQL_Assembler,通过SQL模板读取特征的提取器,按指定的 joinKeys 合并到原数据上
opType: SQL_Assembler,
checkpoint: {
enable: false
},
params: {
joinKeys: [uid],
sqlFile: "my_feature.sql",
// 有子分区目录的hive表需要开启这个参数
enableSubdir: false,
uniqueCheck: true,
primaryKeys: [
uid
],
# 这里对应 dependencies 部分的 key,将上游解析出的 value 传入 sqlFile 中做占位符替换
substitutions: [HIVE_COMPACT_DATE, AGE]
}
},
{
name: "count feature",
desc: "缺失特征值的统计量作为特征",
# MissingValueCounter,计算所有特征中缺失值的个数及占比,同时还会分字段类型分布统计,将得到的统计量作为新的特征
opType: MissingValueCounter,
checkpoint: {
enable: true,
clear: false,
emptyCheck: false
},
params: {
# 统计的时候哪些字段不参与统计,需要列入:主键,label,以及其他不属于特征的字段
excludeCols: [uid, label]
},
},
{
name: "fill missing value",
desc: "填充缺失值",
# MissingValueFiller,按指定的配置项对特征中的缺失值进行填充
opType: MissingValueFiller,
checkpoint: {
enable: false
},
params: {
# 不纳入填充的字段
excludeCols: [uid, label],
# 倒排,key是填充值,value是需要填充的字段
int: { # int 类型的字段如何填充
# 特点的列按特定的值填充,key是填充的值,value是需要填充的列
-999: [occupied_days, order_distance],
# 除上述字段以为的所有int字段,按0填充
default: 0
},
long: { # long 类型的字段如何填充
-999: [airport_fast_cnt_90d],
default: 0
},
float: {
default: 0.0
},
double: {
default: 0.0
},
decimal: {
default: 0.0
},
str: {
"FakeMissingString": [latest_topic, gender]
default: "NULL"
}
},
}
]