{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline Upload Data Tutorial " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### install" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).\n", "\n", "```bash\n", "pip install fate_client\n", "```\n", "\n", "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\n", "\n", "Options:\n", " --help Show this message and exit.\n", "\n", "Commands:\n", " config pipeline config tool\n", " init - DESCRIPTION: Pipeline Config Command.\n" ] } ], "source": [ "!pipeline --help" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Pipeline configuration succeeded.\n" ] } ], "source": [ "!pipeline init --ip 127.0.0.1 --port 9380" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### upload data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ " Before start a modeling task, the data to be used should be uploaded. \n", " Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes." ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "from pipeline.backend.pipeline import PipeLine" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Make a `pipeline` instance:\n", "\n", " - initiator: \n", " * role: guest\n", " * party: 9999\n", " - roles:\n", " * guest: 9999\n", "\n", "note that only local party id is needed.\n", " " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define partitions for data storage" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "partition = 4" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Define table name and namespace, which will be used in FATE job configuration" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "dense_data_guest = {\"name\": \"breast_hetero_guest\", \"namespace\": f\"experiment\"}\n", "dense_data_host = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}\n", "tag_data = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we add data to be uploaded" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "data_base = \"/workspace/FATE/\"\n", "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_guest.csv\"),\n", " table_name=dense_data_guest[\"name\"], # table name\n", " namespace=dense_data_guest[\"namespace\"], # namespace\n", " head=1, partition=partition) # data info\n", "\n", "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_host.csv\"),\n", " table_name=dense_data_host[\"name\"],\n", " namespace=dense_data_host[\"namespace\"],\n", " head=1, partition=partition)\n", "\n", "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_host.csv\"),\n", " table_name=tag_data[\"name\"],\n", " namespace=tag_data[\"namespace\"],\n", " head=1, partition=partition)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can then upload data" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-11-15 11:26:40.541\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151126388669570\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:26:42.601\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:02\u001b[0m\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:26:49.260\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:08\u001b[0m\n", "\u001b[32m2021-11-15 11:26:52.644\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151126388669570\u001b[0m\n", "\u001b[32m2021-11-15 11:26:52.649\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:12\u001b[0m\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-11-15 11:26:53.998\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151126526555290\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:26:54.006\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:27:00.984\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:06\u001b[0m\n", "\u001b[32m2021-11-15 11:27:02.770\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151126526555290\u001b[0m\n", "\u001b[32m2021-11-15 11:27:02.771\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:08\u001b[0m\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[32m2021-11-15 11:27:03.959\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151127027776050\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:27:03.969\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n", "\u001b[0m\n", "\u001b[32m2021-11-15 11:27:11.018\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:07\u001b[0m\n", "\u001b[32m2021-11-15 11:27:12.669\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151127027776050\u001b[0m\n", "\u001b[32m2021-11-15 11:27:12.671\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:08\u001b[0m\n" ] } ], "source": [ "pipeline_upload.upload(drop=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.15" } }, "nbformat": 4, "nbformat_minor": 4 }