{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline Match ID Tutorial " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Starting at version 1.7, FATE distinguishes sample id(sid) and match id. Sid are unique to each sample entry, while match id corresponds to individual sample source identity. This adaption allows FATE to perform private set intersection on samples with repeated match id. User may choose to create sid by appending uuid to original sample entries at uploading; then module `DataTransform` will extract true match id for later use. This tutorial walks through a full uploading-training process to demonstrate how to add and train with sid." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### install" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).\n", "\n", "```bash\n", "pip install fate_client\n", "```\n", "\n", "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\n", "\n", "Options:\n", " --help Show this message and exit.\n", "\n", "Commands:\n", " config pipeline config tool\n", " init - DESCRIPTION: Pipeline Config Command.\n" ] } ], "source": [ "!pipeline --help" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pipeline configuration succeeded.\n" ] } ], "source": [ "!pipeline init --ip 127.0.0.1 --port 9380" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### upload data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " Before start a modeling task, the data to be used should be uploaded. \n", " Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from pipeline.backend.pipeline import PipeLine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make a `pipeline` instance:\n", "\n", " - initiator: \n", " * role: guest\n", " * party: 9999\n", " - roles:\n", " * guest: 9999\n", "\n", "note that only local party id is needed.\n", " " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define partitions for data storage" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "partition = 4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define table name and namespace, which will be used in FATE job configuration" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "dense_data_guest = {\"name\": \"breast_hetero_guest\", \"namespace\": f\"experiment\"}\n", "dense_data_host = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}\n", "tag_data = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we add data to be uploaded. To create uuid as sid, turn on `extend_sid` option. Alternatively, set `auto_increasing_sid` to make extended sid starting at 0." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "data_base = \"/workspace/FATE/\"\n", "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_guest.csv\"),\n", " table_name=dense_data_guest[\"name\"], # table name\n", " namespace=dense_data_guest[\"namespace\"], # namespace\n", " head=1, partition=partition, # data info\n", " extend_sid=True, # extend sid \n", " auto_increasing_sid=False)\n", "\n", "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_host.csv\"),\n", " table_name=dense_data_host[\"name\"],\n", " namespace=dense_data_host[\"namespace\"],\n", " head=1, partition=partition,\n", " extend_sid=True,\n", " auto_increasing_sid=False) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can then upload data" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-12-31 03:27:14.912\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310327142307260\n", "\u001b[0m\n", "\u001b[32m2021-12-31 03:27:14.921\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:15.452\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:27:19.088\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:04\u001b[0m\n", "\u001b[32m2021-12-31 03:27:20.675\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310327142307260\u001b[0m\n", "\u001b[32m2021-12-31 03:27:20.676\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:05\u001b[0m\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-12-31 03:27:21.404\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310327206816320\n", "\u001b[0m\n", "\u001b[32m2021-12-31 03:27:23.987\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:02\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:24.505\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:27:28.142\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:06\u001b[0m\n", "\u001b[32m2021-12-31 03:27:29.690\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310327206816320\u001b[0m\n", "\u001b[32m2021-12-31 03:27:29.691\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:08\u001b[0m\n" ] } ], "source": [ "pipeline_upload.upload(drop=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After uploading, we can then start modeling. Here we build a Hetero SecureBoost model the same way as in [this demo](https://github.com/FederatedAI/FATE/blob/master/doc/tutorial/pipeline/pipeline_tutorial_hetero_sbt.ipynb), but note how specificaiton of `DataTransform` module needs to be adjusted to crrectly load in match id." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "from pipeline.backend.pipeline import PipeLine\n", "from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation\n", "from pipeline.interface import Data\n", "\n", "pipeline = PipeLine() \\\n", " .set_initiator(role='guest', party_id=9999) \\\n", " .set_roles(guest=9999, host=10000)\n", "\n", "reader_0 = Reader(name=\"reader_0\")\n", "# set guest parameter\n", "reader_0.get_party_instance(role='guest', party_id=9999).component_param(\n", " table={\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"})\n", "# set host parameter\n", "reader_0.get_party_instance(role='host', party_id=10000).component_param(\n", " table={\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"})\n", "\n", "# set with match id\n", "data_transform_0 = DataTransform(name=\"data_transform_0\", with_match_id=True)\n", "# set guest parameter\n", "data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(\n", " with_label=True)\n", "data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(\n", " with_label=False)\n", "\n", "intersect_0 = Intersection(name=\"intersect_0\")\n", "\n", "hetero_secureboost_0 = HeteroSecureBoost(name=\"hetero_secureboost_0\",\n", " num_trees=5,\n", " bin_num=16,\n", " task_type=\"classification\",\n", " objective_param={\"objective\": \"cross_entropy\"},\n", " encrypt_param={\"method\": \"paillier\"},\n", " tree_param={\"max_depth\": 3})\n", "\n", "evaluation_0 = Evaluation(name=\"evaluation_0\", eval_type=\"binary\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add components to pipeline, in order of execution:\n", "\n", " - data_transform_0 comsume reader_0's output data\n", " - intersect_0 comsume data_transform_0's output data\n", " - hetero_secureboost_0 consume intersect_0's output data\n", " - evaluation_0 consume hetero_secureboost_0's prediciton result on training data\n", "\n", "Then compile our pipeline to make it ready for submission." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "pipeline.add_component(reader_0)\n", "pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))\n", "pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))\n", "pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))\n", "pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))\n", "pipeline.compile();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, submit(fit) our pipeline:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-12-31 03:27:32.837\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310327304051900\n", "\u001b[0m\n", "\u001b[32m2021-12-31 03:27:34.379\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:01\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:35.420\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:27:39.091\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:06\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:41.739\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:27:46.111\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:13\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:48.749\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:27:55.018\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:22\u001b[0m\n", "\u001b[0mm2021-12-31 03:27:57.632\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:28:26.239\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component hetero_secureboost_0, time elapse: 0:00:53\u001b[0m\n", "\u001b[0mm2021-12-31 03:28:28.840\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n", "\u001b[32m2021-12-31 03:28:33.008\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component evaluation_0, time elapse: 0:01:00\u001b[0m\n", "\u001b[32m2021-12-31 03:28:35.597\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310327304051900\u001b[0m\n", "\u001b[32m2021-12-31 03:28:35.599\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:01:02\u001b[0m\n" ] } ], "source": [ "pipeline.fit()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Check data output on FATEBoard or download component output data to see now each data instance has a uuid as sid." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\n", " \"data\": [\n", " \"sid,inst_id,label,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9\",\n", " \"8e46168869e911ec87ab8eff97a1caf50,133,1,0.254879,-1.046633,0.209656,0.074214,-0.441366,-0.377645,-0.485934,0.347072,-0.28757,-0.733474\",\n", " \"8e46168869e911ec87ab8eff97a1caf51,273,1,-1.142928,-0.781198,-1.166747,-0.923578,0.62823,-1.021418,-1.111867,-0.959523,-0.096672,-0.121683\"\n", " ],\n", " \"meta\": [\n", " \"sid\",\n", " \"inst_id\",\n", " \"label\",\n", " \"x0\",\n", " \"x1\",\n", " \"x2\",\n", " \"x3\",\n", " \"x4\",\n", " \"x5\",\n", " \"x6\",\n", " \"x7\",\n", " \"x8\",\n", " \"x9\"\n", " ]\n", "}\n" ] } ], "source": [ "import json\n", "print(json.dumps(pipeline.get_component(\"data_transform_0\").get_output_data(limits=3), indent=4))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo). [Here](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/match_id_test) we include other pipeline examples using data with match id." ] } ], "metadata": { "interpreter": { "hash": "ad4309918fa4cd1705b305e369b2f64d901b1851e9144aef7b9b07ea3efcb1bb" }, "kernelspec": { "display_name": "Python 3.6.15 64-bit ('py36': venv)", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.15" } }, "nbformat": 4, "nbformat_minor": 4 }