12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #
- # Copyright 2019 The FATE Authors. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import argparse
- from pipeline.backend.pipeline import PipeLine
- from pipeline.component import Intersection
- from pipeline.component import Union
- from pipeline.component import Reader
- from pipeline.interface import Data
- from pipeline.utils.tools import load_job_config
- def main(config="../../config.yaml", namespace=""):
- # obtain config
- if isinstance(config, str):
- config = load_job_config(config)
- parties = config.parties
- guest = parties.guest[0]
- host = parties.host[0]
- # specify input data name & namespace in database
- guest_train_data = {"name": "breast_hetero_guest", "namespace": f"experiment{namespace}"}
- host_train_data = {"name": "breast_hetero_host", "namespace": f"experiment{namespace}"}
- # initialize pipeline
- pipeline = PipeLine()
- # set job initiator
- pipeline.set_initiator(role='guest', party_id=guest)
- # set participants information
- pipeline.set_roles(guest=guest, host=host)
- # define Reader components to read in data
- reader_0 = Reader(name="reader_0")
- # configure Reader for guest
- reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
- # configure Reader for host
- reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_train_data)
- param = {
- "intersect_method": "raw",
- "sync_intersect_ids": True,
- "only_output_key": True,
- "raw_params": {
- "use_hash": True,
- "hash_method": "sha256",
- "salt": "12345",
- "base64": True,
- "join_role": "host"
- }
- }
- # define Intersection components
- intersections = []
- for i in range(200):
- intersection_tmp = Intersection(name="intersection_" + str(i), **param)
- intersections.append(intersection_tmp)
- union_0 = Union(name="union_0")
- # add components to pipeline, in order of task execution
- pipeline.add_component(reader_0)
- # set data input sources of intersection components
- for i in range(len(intersections)):
- pipeline.add_component(intersections[i], data=Data(data=reader_0.output.data))
- # set data output of intersection components
- intersection_outputs = [intersection_tmp.output.data for intersection_tmp in intersections]
- pipeline.add_component(union_0, data=Data(data=intersection_outputs))
- # compile pipeline once finished adding modules, this step will form conf and dsl files for running job
- pipeline.compile()
- # fit model
- pipeline.fit()
- if __name__ == "__main__":
- parser = argparse.ArgumentParser("PIPELINE DEMO")
- parser.add_argument("-config", type=str,
- help="config file")
- args = parser.parse_args()
- if args.config is not None:
- main(args.config)
- else:
- main()
|