机器学习训练流程:读取数据集---标量化处理---二值化处理---拆分数据集---贝叶斯训练---评估训练结果

读取数据集

read_dataset.py

#-*- coding:utf-8 -*- 
import os
import json

def read_dataset(input_url):

    import idsw.hdfs as ih
    import pandas as pd

    hdfs = ih.client()
    with hdfs.open(input_url) as f:
        if input_url.endswith('csv'):
            df = pd.read_csv(f, encoding='utf-8')
    # 预览数据集前5行
    print(df.head(5))
    print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
    job01_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
    output_param={'JOB01_OUTPUT_PATH':'/tmp/weather_patient.csv'}
    with open(job01_output_prop_file,'w') as f:
        json.dump(output_param,f)
        print('write finished')

if __name__=="__main__":
    read_dataset('/tmp/weather_patient.csv')

注意,在azkaban执行job时,会生成临时的文件,同时有相关的环境变量JOB_OUTPUT_PROP_FILE

如果需要进行不同job之间参数传递,可以将参数写入job临时文件。

azkaban的flow文件:read_dataset.flow

type=command
command.1=python read_dataset.py

标量化处理

标量化处理文件mmScaler.py

#-*- coding:utf-8 -*- 
import os
import json
import sys
def mmScaler(input_url, output_url, columnsScaler=None):

    import idsw.hdfs as ih
    import pandas as pd

    hdfs = ih.client()
    with hdfs.open(input_url) as f:
        if input_url.endswith('csv'):
            df = pd.read_csv(f, encoding='utf-8')

    from sklearn.preprocessing import MinMaxScaler
    assert columnsScaler!=None, "please select on column"

    df[columnsScaler] = MinMaxScaler().fit_transform(df[columnsScaler])

    with hdfs.open(output_url, "wb") as writer:
        df.to_csv(writer, encoding='utf-8')

    return output_url

if __name__=="__main__":
    mmScaler(str(sys.argv[1]),'/tmp/01/mmScaler.csv',['AVERAGE_TEMPERATURE','PM2_5','SO2','NO2','atmospheric_pressure','humidity'])   
    print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
    job02_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
    output_param={'JOB02_OUTPUT_PATH':'/tmp/01/mmScaler.csv'}
    with open(job02_output_prop_file,'w') as f:
        json.dump(output_param,f)
        print('write finished')

azkaban的flow文件mmScaler.job

type=command
dependencies=read_dataset
command.1=python mmScaler.py "${JOB01_OUTPUT_PATH}"

二值化处理

二值化处理的文件binarize.py

#-*- coding:utf-8 -*- 
import os
import json
import sys
def binarize(input_url, output_url, columnsBinarize=None, threshold=200):

    import idsw.hdfs as ih
    import pandas as pd

    hdfs = ih.client()
    with hdfs.open(input_url) as f:
        if input_url.endswith('csv'):
            df = pd.read_csv(f, encoding='utf-8')

    from sklearn.preprocessing import Binarizer
    assert columnsBinarize!=None, "select one"

    df[columnsBinarize] = Binarizer(threshold=threshold).fit_transform(df[columnsBinarize])

    with hdfs.open(output_url, "wb") as writer:
        df.to_csv(writer, encoding='utf-8')

    return output_url

if __name__=="__main__":
    binarize(str(sys.argv[1]),'/tmp/01/binarize.csv',['AVERAGE_TEMPERATURE','PM2_5','SO2','NO2','atmospheric_pressure','humidity'],200)  
    print('azkaban-env:',os.environ['JOB_OUTPUT_PROP_FILE'])
    job03_output_prop_file=os.environ['JOB_OUTPUT_PROP_FILE']
    output_param={'JOB03_OUTPUT_PATH':'/tmp/01/binarize.csv'}
    with open(job03_output_prop_file,'w') as f:
        json.dump(output_param,f)
        print('write finished')

results matching ""

    No results matching ""