123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- import argparse
- from pipeline.backend.pipeline import PipeLine
- from pipeline.component import DataTransform
- from pipeline.component import HeteroPoisson
- from pipeline.component import Intersection
- from pipeline.component import Reader
- from pipeline.interface import Data
- from pipeline.interface import Model
- from pipeline.utils.tools import load_job_config
- def main(config="../../config.yaml", namespace=""):
-
- if isinstance(config, str):
- config = load_job_config(config)
- parties = config.parties
- guest = parties.guest[0]
- host = parties.host[0]
- arbiter = parties.arbiter[0]
- guest_train_data = [{"name": "dvisits_hetero_guest", "namespace": f"experiment{namespace}"},
- {"name": "dvisits_hetero_guest", "namespace": f"experiment{namespace}"}]
- host_train_data = [{"name": "dvisits_hetero_host", "namespace": f"experiment{namespace}"},
- {"name": "dvisits_hetero_host", "namespace": f"experiment{namespace}"}]
- pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)
- reader_0 = Reader(name="reader_0")
- reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[0])
- reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_train_data[0])
- reader_1 = Reader(name="reader_1")
- reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[1])
- reader_1.get_party_instance(role='host', party_id=host).component_param(table=host_train_data[1])
- data_transform_0 = DataTransform(name="data_transform_0")
- data_transform_1 = DataTransform(name="data_transform_1")
- data_transform_0.get_party_instance(
- role='guest',
- party_id=guest).component_param(
- with_label=True,
- label_name="doctorco",
- label_type="float",
- output_format="dense")
- data_transform_0.get_party_instance(role='host', party_id=host).component_param(with_label=False)
- intersection_0 = Intersection(name="intersection_0")
- intersect_1 = Intersection(name="intersection_1")
- hetero_poisson_0 = HeteroPoisson(name="hetero_poisson_0", early_stop="weight_diff", max_iter=20,
- exposure_colname="exposure", optimizer="rmsprop", tol=0.001,
- alpha=100.0, batch_size=-1, learning_rate=0.01, penalty="L2",
- callback_param={"callbacks": ["EarlyStopping", "PerformanceEvaluate"],
- "validation_freqs": 1,
- "early_stopping_rounds": 5,
- "metrics": [
- "mean_absolute_error",
- "root_mean_squared_error"
- ],
- "use_first_metric_only": False,
- "save_freq": 1
- },
- init_param={"init_method": "zeros"})
- pipeline.add_component(reader_0)
- pipeline.add_component(reader_1)
- pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
- pipeline.add_component(
- data_transform_1, data=Data(
- data=reader_1.output.data), model=Model(
- data_transform_0.output.model))
- pipeline.add_component(intersection_0, data=Data(data=data_transform_0.output.data))
- pipeline.add_component(intersect_1, data=Data(data=data_transform_1.output.data))
- pipeline.add_component(hetero_poisson_0, data=Data(train_data=intersection_0.output.data,
- validate_data=intersect_1.output.data))
- pipeline.compile()
- pipeline.fit()
- if __name__ == "__main__":
- parser = argparse.ArgumentParser("PIPELINE DEMO")
- parser.add_argument("-config", type=str,
- help="config file")
- args = parser.parse_args()
- if args.config is not None:
- main(args.config)
- else:
- main()
|