机器学习训练流程:读取数据集---标量化处理---二值化处理---拆分数据集---贝叶斯训练---评估训练结果
读取数据集
read_dataset.py
#-*- coding:utf-8 -*-
import os
import json
def read_dataset(input_url):
import idsw.hdfs as ih
import pandas as pd
hdfs = ih.client()
with hdfs.open(input_url) as f:
if input_url.endswith('csv'):
df = pd.read_csv(f, encoding='utf-8')
# 预览数据集前5行
print(df.head(5))
print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
job01_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
output_param={'JOB01_OUTPUT_PATH':'/tmp/weather_patient.csv'}
with open(job01_output_prop_file,'w') as f:
json.dump(output_param,f)
print('write finished')
if __name__=="__main__":
read_dataset('/tmp/weather_patient.csv')
注意,在azkaban执行job时,会生成临时的文件,同时有相关的环境变量JOB_OUTPUT_PROP_FILE
如果需要进行不同job之间参数传递,可以将参数写入job临时文件。
azkaban的flow文件:read_dataset.flow
type=command
command.1=python read_dataset.py
标量化处理
标量化处理文件mmScaler.py
#-*- coding:utf-8 -*-
import os
import json
import sys
def mmScaler(input_url, output_url, columnsScaler=None):
import idsw.hdfs as ih
import pandas as pd
hdfs = ih.client()
with hdfs.open(input_url) as f:
if input_url.endswith('csv'):
df = pd.read_csv(f, encoding='utf-8')
from sklearn.preprocessing import MinMaxScaler
assert columnsScaler!=None, "please select on column"
df[columnsScaler] = MinMaxScaler().fit_transform(df[columnsScaler])
with hdfs.open(output_url, "wb") as writer:
df.to_csv(writer, encoding='utf-8')
return output_url
if __name__=="__main__":
mmScaler(str(sys.argv[1]),'/tmp/01/mmScaler.csv',['AVERAGE_TEMPERATURE','PM2_5','SO2','NO2','atmospheric_pressure','humidity'])
print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
job02_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
output_param={'JOB02_OUTPUT_PATH':'/tmp/01/mmScaler.csv'}
with open(job02_output_prop_file,'w') as f:
json.dump(output_param,f)
print('write finished')
azkaban的flow文件mmScaler.job
type=command
dependencies=read_dataset
command.1=python mmScaler.py "${JOB01_OUTPUT_PATH}"
二值化处理
二值化处理的文件binarize.py
#-*- coding:utf-8 -*-
import os
import json
import sys
def binarize(input_url, output_url, columnsBinarize=None, threshold=200):
import idsw.hdfs as ih
import pandas as pd
hdfs = ih.client()
with hdfs.open(input_url) as f:
if input_url.endswith('csv'):
df = pd.read_csv(f, encoding='utf-8')
from sklearn.preprocessing import Binarizer
assert columnsBinarize!=None, "select one"
df[columnsBinarize] = Binarizer(threshold=threshold).fit_transform(df[columnsBinarize])
with hdfs.open(output_url, "wb") as writer:
df.to_csv(writer, encoding='utf-8')
return output_url
if __name__=="__main__":
binarize(str(sys.argv[1]),'/tmp/01/binarize.csv',['AVERAGE_TEMPERATURE','PM2_5','SO2','NO2','atmospheric_pressure','humidity'],200)
print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
job03_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
output_param={'JOB03_OUTPUT_PATH':'/tmp/01/binarize.csv'}
with open(job03_output_prop_file,'w') as f:
json.dump(output_param,f)
print('write finished')