## Pipeline Match ID Tutorial 

Starting at version 1.7, FATE distinguishes sample id(sid) and match id. Sid are unique to each sample entry, while match id corresponds to individual sample source identity. This adaption allows FATE to perform private set intersection on samples with repeated match id. User may choose to create sid by appending uuid to original sample entries at uploading; then module `DataTransform` will extract true match id for later use. This tutorial walks through a full uploading-training process to demonstrate how to add and train with sid.

### install

`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).

```bash
pip install fate_client
```

To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:

In [1]:
!pipeline --help

Usage: pipeline [OPTIONS] COMMAND [ARGS]...

Options:
 --help Show this message and exit.

Commands:
 config pipeline config tool
 init - DESCRIPTION: Pipeline Config Command.


Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec

In [2]:
!pipeline init --ip 127.0.0.1 --port 9380

Pipeline configuration succeeded.


### upload data

 Before start a modeling task, the data to be used should be uploaded. 
 Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes.

In [3]:
from pipeline.backend.pipeline import PipeLine

Make a `pipeline` instance:

 - initiator: 
 * role: guest
 * party: 9999
 - roles:
 * guest: 9999

note that only local party id is needed.
 

In [4]:
pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)

Define partitions for data storage

In [5]:
partition = 4

Define table name and namespace, which will be used in FATE job configuration

In [6]:
dense_data_guest = {"name": "breast_hetero_guest", "namespace": f"experiment"}
dense_data_host = {"name": "breast_hetero_host", "namespace": f"experiment"}
tag_data = {"name": "breast_hetero_host", "namespace": f"experiment"}

Now, we add data to be uploaded. To create uuid as sid, turn on `extend_sid` option. Alternatively, set `auto_increasing_sid` to make extended sid starting at 0.

In [7]:
data_base = "/workspace/FATE/"
pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_guest.csv"),
 table_name=dense_data_guest["name"], # table name
 namespace=dense_data_guest["namespace"], # namespace
 head=1, partition=partition, # data info
 extend_sid=True, # extend sid 
 auto_increasing_sid=False)

pipeline_upload.add_upload_data(file=os.path.join(data_base, "examples/data/breast_hetero_host.csv"),
 table_name=dense_data_host["name"],
 namespace=dense_data_host["namespace"],
 head=1, partition=partition,
 extend_sid=True,
 auto_increasing_sid=False) 

We can then upload data

In [8]:
pipeline_upload.upload(drop=1)

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%


[32m2021-12-31 03:27:14.912[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m123[0m - [1mJob id is 202112310327142307260
[0m
[32m2021-12-31 03:27:14.921[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m144[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[0mm2021-12-31 03:27:15.452[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m173[0m - [1m
[32m2021-12-31 03:27:19.088[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m177[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:04[0m
[32m2021-12-31 03:27:20.675[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m131[0m - [1mJob is success!!! Job id is 202112310327142307260[0m
[32m2021-12-31 03:27:20.676[0m | [1mINFO [0m | [36mpipeline

 UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%


[32m2021-12-31 03:27:21.404[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m123[0m - [1mJob id is 202112310327206816320
[0m
[32m2021-12-31 03:27:23.987[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m144[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:02[0m
[0mm2021-12-31 03:27:24.505[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m173[0m - [1m
[32m2021-12-31 03:27:28.142[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m177[0m - [1m[80D[1A[KRunning component upload_0, time elapse: 0:00:06[0m
[32m2021-12-31 03:27:29.690[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m131[0m - [1mJob is success!!! Job id is 202112310327206816320[0m
[32m2021-12-31 03:27:29.691[0m | [1mINFO [0m | [36mpipeline

After uploading, we can then start modeling. Here we build a Hetero SecureBoost model the same way as in [this demo](https://github.com/FederatedAI/FATE/blob/master/doc/tutorial/pipeline/pipeline_tutorial_hetero_sbt.ipynb), but note how specificaiton of `DataTransform` module needs to be adjusted to crrectly load in match id.

In [9]:
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation
from pipeline.interface import Data

pipeline = PipeLine() \
 .set_initiator(role='guest', party_id=9999) \
 .set_roles(guest=9999, host=10000)

reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
 table={"name": "breast_hetero_guest", "namespace": "experiment"})
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
 table={"name": "breast_hetero_host", "namespace": "experiment"})

# set with match id
data_transform_0 = DataTransform(name="data_transform_0", with_match_id=True)
# set guest parameter
data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(
 with_label=True)
data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(
 with_label=False)

intersect_0 = Intersection(name="intersect_0")

hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
 num_trees=5,
 bin_num=16,
 task_type="classification",
 objective_param={"objective": "cross_entropy"},
 encrypt_param={"method": "paillier"},
 tree_param={"max_depth": 3})

evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")

Add components to pipeline, in order of execution:

 - data_transform_0 comsume reader_0's output data
 - intersect_0 comsume data_transform_0's output data
 - hetero_secureboost_0 consume intersect_0's output data
 - evaluation_0 consume hetero_secureboost_0's prediciton result on training data

Then compile our pipeline to make it ready for submission.

In [10]:
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))
pipeline.compile();

Now, submit(fit) our pipeline:

In [11]:
pipeline.fit()

[32m2021-12-31 03:27:32.837[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m123[0m - [1mJob id is 202112310327304051900
[0m
[32m2021-12-31 03:27:34.379[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m144[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2021-12-31 03:27:35.420[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m173[0m - [1m
[32m2021-12-31 03:27:39.091[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m177[0m - [1m[80D[1A[KRunning component reader_0, time elapse: 0:00:06[0m
[0mm2021-12-31 03:27:41.739[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m173[0m - [1m
[32m2021-12-31 03:27:46.111[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_sta

Check data output on FATEBoard or download component output data to see now each data instance has a uuid as sid.

In [12]:
import json
print(json.dumps(pipeline.get_component("data_transform_0").get_output_data(limits=3), indent=4))

{
 "data": [
 "sid,inst_id,label,x0,x1,x2,x3,x4,x5,x6,x7,x8,x9",
 "8e46168869e911ec87ab8eff97a1caf50,133,1,0.254879,-1.046633,0.209656,0.074214,-0.441366,-0.377645,-0.485934,0.347072,-0.28757,-0.733474",
 "8e46168869e911ec87ab8eff97a1caf51,273,1,-1.142928,-0.781198,-1.166747,-0.923578,0.62823,-1.021418,-1.111867,-0.959523,-0.096672,-0.121683"
 ],
 "meta": [
 "sid",
 "inst_id",
 "label",
 "x0",
 "x1",
 "x2",
 "x3",
 "x4",
 "x5",
 "x6",
 "x7",
 "x8",
 "x9"
 ]
}


For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo). [Here](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/match_id_test) we include other pipeline examples using data with match id.