{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline Tutorial with HeteroSecureBoost" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### install" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).\n", "\n", "```bash\n", "pip install fate_client\n", "```\n", "\n", "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\n", "\n", "Options:\n", " --help Show this message and exit.\n", "\n", "Commands:\n", " config pipeline config tool\n", " init - DESCRIPTION: Pipeline Config Command.\n" ] } ], "source": [ "!pipeline --help" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pipeline configuration succeeded.\n" ] } ], "source": [ "!pipeline init --ip 127.0.0.1 --port 9380" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Hetero SecureBoost Example" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " Before start a modeling task, data to be used should be uploaded. Please refer to this [guide](https://github.com/FederatedAI/FATE/blob/master/doc/tutorial/pipeline/pipeline_tutorial_upload.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `pipeline` package provides components to compose a `FATE pipeline`." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from pipeline.backend.pipeline import PipeLine\n", "from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation\n", "from pipeline.interface import Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make a `pipeline` instance:\n", "\n", " - initiator: \n", " * role: guest\n", " * party: 9999\n", " - roles:\n", " * guest: 9999\n", " * host: 10000\n", " " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "pipeline = PipeLine() \\\n", " .set_initiator(role='guest', party_id=9999) \\\n", " .set_roles(guest=9999, host=10000)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define a `Reader` to load data" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "reader_0 = Reader(name=\"reader_0\")\n", "# set guest parameter\n", "reader_0.get_party_instance(role='guest', party_id=9999).component_param(\n", " table={\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"})\n", "# set host parameter\n", "reader_0.get_party_instance(role='host', party_id=10000).component_param(\n", " table={\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add a `DataTransform` component to parse raw data into Data Instance" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "data_transform_0 = DataTransform(name=\"data_transform_0\")\n", "# set guest parameter\n", "data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(\n", " with_label=True)\n", "data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(\n", " with_label=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add a `Intersection` component to perform PSI for hetero-scenario" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "intersect_0 = Intersection(name=\"intersect_0\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we define the `HeteroSecureBoost` component. The following parameters will be set for all parties involved." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "hetero_secureboost_0 = HeteroSecureBoost(name=\"hetero_secureboost_0\",\n", " num_trees=5,\n", " bin_num=16,\n", " task_type=\"classification\",\n", " objective_param={\"objective\": \"cross_entropy\"},\n", " encrypt_param={\"method\": \"paillier\"},\n", " tree_param={\"max_depth\": 3})\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To show the evaluation result, an \"Evaluation\" component is needed." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "evaluation_0 = Evaluation(name=\"evaluation_0\", eval_type=\"binary\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add components to pipeline, in order of execution:\n", "\n", " - data_transform_0 comsume reader_0's output data\n", " - intersect_0 comsume data_transform_0's output data\n", " - hetero_secureboost_0 consume intersect_0's output data\n", " - evaluation_0 consume hetero_secureboost_0's prediciton result on training data\n", "\n", "Then compile our pipeline to make it ready for submission." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "pipeline.add_component(reader_0)\n", "pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))\n", "pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))\n", "pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))\n", "pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))\n", "pipeline.compile();\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, submit(fit) our pipeline:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-12-31 03:24:22.633\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310324182459270\n", "\u001b[0m\n", "\u001b[32m2021-12-31 03:24:23.152\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n", "\u001b[0mm2021-12-31 03:24:23.671\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:24:27.861\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:05\u001b[0m\n", "\u001b[0mm2021-12-31 03:24:30.533\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:24:34.732\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:12\u001b[0m\n", "\u001b[0mm2021-12-31 03:24:37.333\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:24:43.185\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:20\u001b[0m\n", "\u001b[0mm2021-12-31 03:24:46.315\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:25:18.915\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component hetero_secureboost_0, time elapse: 0:00:56\u001b[0m\n", "\u001b[0mm2021-12-31 03:25:22.043\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:25:27.867\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component evaluation_0, time elapse: 0:01:05\u001b[0m\n", "\u001b[32m2021-12-31 03:25:31.005\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310324182459270\u001b[0m\n", "\u001b[32m2021-12-31 03:25:31.007\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:01:08\u001b[0m\n" ] } ], "source": [ "pipeline.fit()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once training is done, trained model may be used for prediction. Optionally, save the trained pipeline for future use." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "pipeline.dump(\"pipeline_saved.pkl\");" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, deploy needed components from train pipeline" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "pipeline = PipeLine.load_model_from_file('pipeline_saved.pkl')\n", "pipeline.deploy_component([pipeline.data_transform_0, pipeline.intersect_0, pipeline.hetero_secureboost_0]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define new `Reader` components for reading prediction data" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "reader_1 = Reader(name=\"reader_1\")\n", "reader_1.get_party_instance(role=\"guest\", party_id=9999).component_param(table={\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"})\n", "reader_1.get_party_instance(role=\"host\", party_id=10000).component_param(table={\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Optionally, define new `Evaluation` component." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "evaluation_0 = Evaluation(name=\"evaluation_0\", eval_type=\"binary\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add components to predict pipeline in order of execution:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "predict_pipeline = PipeLine()\n", "predict_pipeline.add_component(reader_1)\\\n", " .add_component(pipeline, \n", " data=Data(predict_input={pipeline.data_transform_0.input.data: reader_1.output.data}))\\\n", " .add_component(evaluation_0, data=Data(data=pipeline.hetero_secureboost_0.output.data));\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Then, run prediction job" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-12-31 03:25:35.541\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310325328444510\n", "\u001b[0m\n", "\u001b[32m2021-12-31 03:25:47.384\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:11\u001b[0m\n", "\u001b[0mm2021-12-31 03:25:47.903\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:25:52.078\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_1, time elapse: 0:00:16\u001b[0m\n", "\u001b[0mm2021-12-31 03:25:54.161\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:25:58.545\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:23\u001b[0m\n", "\u001b[0mm2021-12-31 03:26:01.167\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:26:07.502\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:31\u001b[0m\n", "\u001b[0mm2021-12-31 03:26:10.137\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:26:18.580\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component hetero_secureboost_0, time elapse: 0:00:43\u001b[0m\n", "\u001b[0mm2021-12-31 03:26:21.245\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:26:25.480\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component evaluation_0, time elapse: 0:00:49\u001b[0m\n", "\u001b[32m2021-12-31 03:26:28.136\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310325328444510\u001b[0m\n", "\u001b[32m2021-12-31 03:26:28.143\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:52\u001b[0m\n" ] } ], "source": [ "predict_pipeline.predict()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)" ] } ], "metadata": { "interpreter": { "hash": "ad4309918fa4cd1705b305e369b2f64d901b1851e9144aef7b9b07ea3efcb1bb" }, "kernelspec": { "display_name": "Python 3.6.15 64-bit ('py36': venv)", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.15" } }, "nbformat": 4, "nbformat_minor": 4 }