{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Federated classic CTR model training on Criteo\n", "\n", "In this tutorial, we show you how to develop a horizontal federated recommendation model. We use the third-party library torch-rechub to call some classic recommendation models such as FM, DeepFM, etc., and use it to build a federated task in FATE. In terms of the dataset, we use the Criteo dataset which has been sampled and preprocessed, with a total of 50K data. For convenience, in this tutorial, all clients use the same dataset." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Install Torch-rechub\n", "\n", "We will use torch-rechub from:https://github.com/datawhalechina/torch-rechub.\n", "To run this tutorial, you need to install the latest-version:\n", "\n", "- git clone https://github.com/datawhalechina/torch-rechub.git\n", "- cd torch-rechub\n", "- python setup.py install" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## The Criteo\n", "The Criteo dataset is an online advertising dataset released by Criteo Labs. It contains millions of records of click feedback for displayed ads, which can be used as a benchmark for click-through rate (CTR) prediction. The dataset has 40 features, with the first column being the label, where a value of 1 indicates that the ad was clicked and a value of 0 indicates that the ad was not clicked. The other features include 13 dense features and 26 sparse features.\n", "\n", "- Dowload sampled data used in this tutorial here:\n", "- \n", " https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/examples/data/criteo.csv\n", " \n", " And then put the csv under /examples/data.\n", "\n", "- The origin source of the criteo dataset is: https://ailab.criteo.com/ressources/" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Construct dataset class\n", "\n", "Here we will construct a Criteo dataset that read the csv. At the same time, it will process the dataset and return \n", "information about dense&sparse features." ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pipeline.component.nn import save_to_fate " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%%save_to_fate dataset criteo_dataset.py\n", "from sklearn.preprocessing import MinMaxScaler, LabelEncoder\n", "from tqdm import tqdm\n", "import pandas as pd\n", "import numpy as np\n", "from federatedml.nn.dataset.base import Dataset\n", "from torch_rechub.basic.features import DenseFeature, SparseFeature\n", "\n", "class CriteoDataset(Dataset):\n", " \n", " def __init__(self):\n", " # super().__init__()\n", " super(CriteoDataset, self).__init__()\n", " self.x = None\n", " self.y = None\n", " self.sample_ids= None\n", " \n", " # load dataset and process\n", " def load(self, data_path):\n", " \n", " # convert numeric features\n", " def convert_numeric_feature(val):\n", " v = int(val)\n", " if v > 2:\n", " return int(np.log(v)**2)\n", " else:\n", " return v - 2\n", " # load data\n", " if data_path.endswith(\".gz\"): #if the raw_data is gz file:\n", " data = pd.read_csv(data_path, compression=\"gzip\")\n", " else:\n", " data = pd.read_csv(data_path)\n", " print(\"data load finished\")\n", " # print(data.info())\n", " if 'sid' in data.columns:\n", " self.sample_ids = list(data.sid)\n", " \n", " # feature_process\n", " dense_features = [f for f in data.columns.tolist() if f[0] == \"I\"]\n", " sparse_features = [f for f in data.columns.tolist() if f[0] == \"C\"]\n", "\n", " data[sparse_features] = data[sparse_features].fillna('0')\n", " data[dense_features] = data[dense_features].fillna(0)\n", " \n", " for feat in tqdm(dense_features): #discretize dense feature and as new sparse feature\n", " sparse_features.append(feat + \"_cat\")\n", " data[feat + \"_cat\"] = data[feat].apply(lambda x: convert_numeric_feature(x))\n", "\n", " sca = MinMaxScaler() #scaler dense feature\n", " data[dense_features] = sca.fit_transform(data[dense_features])\n", "\n", " for feat in tqdm(sparse_features): #encode sparse feature\n", " lbe = LabelEncoder()\n", " data[feat] = lbe.fit_transform(data[feat])\n", "\n", " def get_dense_feat(name):\n", " return DenseFeature(name), {'name': name}\n", " dense_feas = []\n", " dense_feas_dict = []\n", " for feature_name in dense_features:\n", " a,b = get_dense_feat(feature_name)\n", " dense_feas.append(a)\n", " dense_feas_dict.append(b)\n", "\n", " def get_sparse_feat(name, vocab_size,embed_dim):\n", " return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}\n", "\n", " sparse_feas=[]\n", " sparse_feas_dict=[]\n", " for feature_name in sparse_features:\n", " a,b = get_sparse_feat(name=feature_name, vocab_size=data[feature_name].nunique(), embed_dim=16)\n", " sparse_feas.append(a)\n", " sparse_feas_dict.append(b)\n", "\n", " ffm_linear_feas = []\n", " ffm_linear_feas_dict = []\n", " def get_ffm_linear_feat(name, vocab_size,embed_dim):\n", " return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}\n", " for feature in sparse_feas:\n", " a,b = get_ffm_linear_feat(name = feature.name, vocab_size=feature.vocab_size, embed_dim=1)\n", " ffm_linear_feas.append(a)\n", " ffm_linear_feas_dict.append(b)\n", "\n", " ffm_cross_feas = []\n", " ffm_cross_feas_dict = []\n", " def get_ffm_cross_feat(name, vocab_size,embed_dim):\n", " return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}\n", " for feature in sparse_feas:\n", " a,b = get_ffm_cross_feat(name = feature.name, vocab_size=feature.vocab_size*len(sparse_feas), embed_dim=10)\n", " ffm_cross_feas.append(a)\n", " ffm_cross_feas_dict.append(b)\n", " \n", " y = data[\"label\"]\n", " del data[\"label\"]\n", " x = data\n", " \n", " # ncq:\n", " self.x = x\n", " self.y = y\n", " \n", " return dense_feas, dense_feas_dict,sparse_feas,sparse_feas_dict, ffm_linear_feas,ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict\n", "\n", " def __getitem__(self, index):\n", " return {k: v[index] for k, v in self.x.items()}, self.y[index].astype(np.float32)\n", "\n", " def __len__(self):\n", " return len(self.y)\n", "\n", " def get_sample_ids(self,):\n", " return self.sample_ids" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### CTR Models\n", "Here based on Torch-rechub, we develop four kinds of CTR models: FM, FFM, DeepFM and DeepFFM" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "%%save_to_fate model fm_model.py\n", "import torch\n", "from torch_rechub.basic.layers import FM,LR,EmbeddingLayer\n", "from torch_rechub.basic.features import DenseFeature, SparseFeature\n", "class FMModel(torch.nn.Module):\n", " '''\n", " A standard FM models\n", " '''\n", " def __init__(self, dense_feas_dict, sparse_feas_dict):\n", " super(FMModel, self).__init__()\n", " dense_features = []\n", " def recover_dense_feat(dict_):\n", " return DenseFeature(name=dict_['name'])\n", " \n", " for i in dense_feas_dict:\n", " dense_features.append(recover_dense_feat(i))\n", " self.dense_features = dense_features\n", " \n", " sparse_features = []\n", " def recover_sparse_feat(dict_):\n", " return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])\n", " \n", " for i in sparse_feas_dict:\n", " sparse_features.append(recover_sparse_feat(i))\n", " self.sparse_features = sparse_features\n", " \n", " self.fm_features = self.sparse_features\n", " self.fm_dims = sum([fea.embed_dim for fea in self.fm_features])\n", " self.linear = LR(self.fm_dims) # 1-odrder interaction\n", " self.fm = FM(reduce_sum=True) # 2-odrder interaction\n", " self.embedding = EmbeddingLayer(self.fm_features)\n", "\n", " def forward(self, x):\n", " input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]input_fm: torch.Size([100, 39, 16])\n", " # print('input_fm:',input_fm.shape) # input_fm: torch.Size([100, 39, 16]) (batch_size, num_features, embed_dim)\n", " y_linear = self.linear(input_fm.flatten(start_dim=1))\n", " y_fm = self.fm(input_fm)\n", " # print('y_fm.shape:',y_fm.shape) # y_fm.shape: torch.Size([100, 1])\n", "\n", " y = y_linear + y_fm\n", " return torch.sigmoid(y.squeeze(1))" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "%%save_to_fate model deepfm_model.py\n", "import torch\n", "from torch_rechub.basic.layers import FM,LR,EmbeddingLayer,MLP\n", "from torch_rechub.basic.features import DenseFeature, SparseFeature\n", "\n", "class DeepFMModel(torch.nn.Module):\n", " \"\"\"Deep Factorization Machine Model\n", " Args:\n", " deep_features (list): the list of `Feature Class`, training by the deep part module.\n", " fm_features (list): the list of `Feature Class`, training by the fm part module.\n", " mlp_params (dict): the params of the last MLP module, keys include:`{\"dims\":list, \"activation\":str, \"dropout\":float, \"output_layer\":bool`}\n", " \"\"\"\n", "\n", " def __init__(self, dense_feas_dict, sparse_feas_dict, mlp_params):\n", " super(DeepFMModel, self).__init__()\n", " dense_features = []\n", " def recover_dense_feat(dict_):\n", " return DenseFeature(name=dict_['name'])\n", " \n", " for i in dense_feas_dict:\n", " dense_features.append(recover_dense_feat(i))\n", " self.dense_features = dense_features\n", " \n", " sparse_features = []\n", " def recover_sparse_feat(dict_):\n", " return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])\n", " \n", " for i in sparse_feas_dict:\n", " sparse_features.append(recover_sparse_feat(i))\n", " self.deep_features = dense_features\n", " self.fm_features = sparse_features\n", " \n", " self.deep_dims = sum([fea.embed_dim for fea in dense_features])\n", " self.fm_dims = sum([fea.embed_dim for fea in sparse_features])\n", " self.linear = LR(self.fm_dims) # 1-odrder interaction\n", " self.fm = FM(reduce_sum=True) # 2-odrder interaction\n", " self.embedding = EmbeddingLayer(dense_features + sparse_features)\n", " self.mlp = MLP(self.deep_dims, **mlp_params)\n", "\n", " def forward(self, x):\n", " input_deep = self.embedding(x, self.deep_features, squeeze_dim=True) #[batch_size, deep_dims]\n", " input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]\n", "\n", " y_linear = self.linear(input_fm.flatten(start_dim=1))\n", " y_fm = self.fm(input_fm)\n", " y_deep = self.mlp(input_deep) #[batch_size, 1]\n", " y = y_linear + y_fm + y_deep\n", " return torch.sigmoid(y.squeeze(1))" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "%%save_to_fate model ffm_model.py\n", "import torch\n", "from torch_rechub.basic.layers import LR,EmbeddingLayer,FFM,FM\n", "import torch\n", "from torch_rechub.basic.features import DenseFeature, SparseFeature\n", "\n", "class FFMModel(torch.nn.Module):\n", " \n", " def __init__(self, liner_feas_dict, cross_feas_dict):\n", " super(FFMModel, self).__init__()\n", " linear_features = []\n", " def recover_ffm_linear_feat(dict_):\n", " return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])\n", "\n", " for i in liner_feas_dict:\n", " linear_features.append(recover_ffm_linear_feat(i))\n", "\n", " cross_features = []\n", " def recover_ffm_cross_feat(dict_):\n", " return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])\n", "\n", " for i in cross_feas_dict:\n", " cross_features.append(recover_ffm_cross_feat(i)) \n", " self.linear_features = linear_features\n", " self.cross_features = cross_features \n", " self.num_fields = len(cross_features)\n", " self.num_field_cross = self.num_fields * (self.num_fields - 1) // 2\n", " # print('num_fields:',self.num_fields) #39\n", " self.ffm = FFM(num_fields=self.num_fields, reduce_sum=False) \n", " self.linear_embedding = EmbeddingLayer(linear_features)\n", " self.ffm_embedding = EmbeddingLayer(cross_features)\n", " self.b =torch.nn.Parameter(torch.zeros(1))\n", " fields_offset = torch.arange(0, self.num_fields, dtype=torch.long)\n", " self.register_buffer('fields_offset', fields_offset)\n", " self.fm = FM(reduce_sum=True)\n", "\n", " def forward(self, x):\n", " y_linear = self.linear_embedding(x, self.linear_features, squeeze_dim=True).sum(1, keepdim=True) #[batch_size, 1] \n", " # output shape [batch_size, num_field, num_field, emb_dim]\n", " x_ffm = {fea.name: x[fea.name].unsqueeze(1) * self.num_fields + self.fields_offset for fea in self.cross_features} \n", " input_ffm = self.ffm_embedding(x_ffm, self.cross_features, squeeze_dim=False) \n", " # print('input_ffm.shape:',input_ffm.shape) # torch.Size([100, 39, 39, 10])\n", " em = self.ffm(input_ffm) \n", " # print('output_ffm.shape:',em.shape) \n", " # torch.Size([100, 741, 10])(batch_size, num_fields*(num_fields-1)/2,embed_dim) \n", " y_ffm = self.fm(em)\n", " # compute scores from the ffm part of the model, output shape [batch_size, 1]\n", " # print('y_linear.shape:',y_linear.shape) # torch.Size([100,1])\n", " y = y_linear + y_ffm\n", " return torch.sigmoid(y.squeeze(1)+ self.b)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "%%save_to_fate model deepffm_model.py\n", "import torch \n", "from torch import nn\n", "from torch_rechub.basic.features import SparseFeature\n", "from torch_rechub.models.ranking import DeepFFM\n", "\n", "class DeepFFMModel(nn.Module):\n", " def __init__(self,ffm_linear_feas_dict, ffm_cross_feas_dict):\n", " super().__init__()\n", " linear_features = []\n", " cross_features = []\n", " \n", " def recover_sparse_feat(dict_):\n", " return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])\n", " \n", " for i in ffm_linear_feas_dict:\n", " linear_features.append(recover_sparse_feat(i))\n", " for i in ffm_cross_feas_dict:\n", " cross_features.append(recover_sparse_feat(i))\n", " self.model = DeepFFM(linear_features=linear_features, cross_features=cross_features, embed_dim=10, mlp_params={\"dims\": [1600, 1600], \"dropout\": 0.5, \"activation\": \"relu\"})\n", " \n", " def forward(self, x):\n", " return self.model(x)\n", " " ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Local Testing\n", "Test this set (dataset + model) locally to see if it can run, and omit the federated aggregation part during local validation." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "ipcl_python failed to import\n", "ipcl_python failed to import\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "data load finished\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 26/26 [00:13<00:00, 1.87it/s]\n", "100%|██████████| 52/52 [00:01<00:00, 40.50it/s]\n" ] } ], "source": [ "import torch\n", "from federatedml.nn.homo.trainer.fedavg_trainer import FedAVGTrainer\n", "\n", "ds = CriteoDataset()\n", "path = '../../../../examples/data/criteo.csv'\n", "\n", "dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, \\\n", " ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict = ds.load(path)\n", "\n", "fm_model = FMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict)\n", "\n", "ffm_model = FFMModel(liner_feas_dict=ffm_linear_feas_dict, cross_feas_dict=ffm_cross_feas_dict)\n", "\n", "dfm_model = DeepFMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict, mlp_params={\"dims\": [256, 128], \"dropout\": 0.2, \"activation\": \"relu\"})\n", "\n", "dffm_model = DeepFFMModel(ffm_linear_feas_dict=ffm_linear_feas_dict,ffm_cross_feas_dict=ffm_cross_feas_dict)\n", "\n", "model = fm_model" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "epoch is 0\n", "100%|██████████| 1954/1954 [11:24<00:00, 2.86it/s]\n", "epoch loss is 1.3575431838684082\n" ] } ], "source": [ "# here we test one model: the FM Model\n", "trainer = FedAVGTrainer(epochs=1, batch_size=256)\n", "trainer.local_mode()\n", "trainer.set_model(model)\n", "trainer.train(ds, None, optimizer=torch.optim.Adam(model.parameters(), lr=0.01), loss=torch.nn.BCELoss())" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Submit a Pipeline to run a federation task" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "data load finished\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "100%|██████████| 26/26 [00:13<00:00, 1.87it/s]\n", "100%|██████████| 52/52 [00:01<00:00, 43.13it/s]\n", "\u001b[32m2022-12-27 10:52:35.371\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m83\u001b[0m - \u001b[1mJob id is 202212271052347495390\n", "\u001b[0m\n", "\u001b[32m2022-12-27 10:52:35.523\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n", "\u001b[32m2022-12-27 10:52:36.535\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:01\u001b[0m\n", "\u001b[0mm2022-12-27 10:52:38.601\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n", "\u001b[32m2022-12-27 10:52:38.603\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:03\u001b[0m\n", "\u001b[32m2022-12-27 10:52:39.632\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:04\u001b[0m\n", "\u001b[32m2022-12-27 10:52:40.768\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:05\u001b[0m\n", "\u001b[32m2022-12-27 10:52:41.822\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:06\u001b[0m\n", "\u001b[32m2022-12-27 10:52:42.864\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:07\u001b[0m\n", "\u001b[32m2022-12-27 10:52:43.893\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:08\u001b[0m\n", "\u001b[0mm2022-12-27 10:52:46.066\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n", "\u001b[32m2022-12-27 10:52:46.067\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:10\u001b[0m\n", "\u001b[32m2022-12-27 10:52:47.095\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:11\u001b[0m\n", "\u001b[32m2022-12-27 10:52:48.134\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:12\u001b[0m\n", "\u001b[32m2022-12-27 10:52:49.166\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:13\u001b[0m\n", "\u001b[32m2022-12-27 10:52:50.251\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:14\u001b[0m\n", "\u001b[32m2022-12-27 10:52:51.279\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:15\u001b[0m\n", "\u001b[32m2022-12-27 10:52:52.323\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:16\u001b[0m\n", "\u001b[32m2022-12-27 10:52:53.354\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:17\u001b[0m\n", "\u001b[32m2022-12-27 10:52:54.438\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:19\u001b[0m\n", "\u001b[32m2022-12-27 10:52:55.471\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:20\u001b[0m\n", "\u001b[32m2022-12-27 10:52:56.500\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:21\u001b[0m\n", "\u001b[32m2022-12-27 10:52:57.534\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:22\u001b[0m\n", "\u001b[32m2022-12-27 10:52:58.597\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:23\u001b[0m\n", "\u001b[32m2022-12-27 10:52:59.648\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:24\u001b[0m\n", "\u001b[32m2022-12-27 10:53:00.723\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:25\u001b[0m\n", "\u001b[32m2022-12-27 10:53:01.773\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:26\u001b[0m\n", "\u001b[32m2022-12-27 10:53:02.796\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:27\u001b[0m\n", "\u001b[32m2022-12-27 10:53:03.828\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:28\u001b[0m\n", "\u001b[32m2022-12-27 10:53:04.855\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:29\u001b[0m\n", "\u001b[32m2022-12-27 10:53:05.883\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:30\u001b[0m\n", "\u001b[32m2022-12-27 10:53:06.919\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:31\u001b[0m\n", "\u001b[32m2022-12-27 10:53:07.954\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:32\u001b[0m\n", "\u001b[32m2022-12-27 10:53:08.994\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:33\u001b[0m\n", "\u001b[32m2022-12-27 10:53:10.020\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:34\u001b[0m\n", "\u001b[32m2022-12-27 10:53:11.055\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:35\u001b[0m\n", "\u001b[32m2022-12-27 10:53:12.087\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:36\u001b[0m\n", "\u001b[32m2022-12-27 10:53:13.113\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:37\u001b[0m\n", "\u001b[32m2022-12-27 10:53:14.147\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:38\u001b[0m\n", "\u001b[32m2022-12-27 10:53:15.174\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:39\u001b[0m\n", "\u001b[32m2022-12-27 10:53:16.213\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:40\u001b[0m\n", "\u001b[32m2022-12-27 10:53:17.250\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:41\u001b[0m\n", "\u001b[32m2022-12-27 10:53:18.278\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:42\u001b[0m\n", "\u001b[32m2022-12-27 10:53:19.312\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:43\u001b[0m\n", "\u001b[32m2022-12-27 10:53:20.353\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:44\u001b[0m\n", "\u001b[32m2022-12-27 10:53:21.378\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:46\u001b[0m\n", "\u001b[32m2022-12-27 10:53:22.422\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:47\u001b[0m\n", "\u001b[32m2022-12-27 10:53:23.445\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:48\u001b[0m\n", "\u001b[32m2022-12-27 10:53:24.496\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:49\u001b[0m\n", "\u001b[32m2022-12-27 10:53:25.537\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:50\u001b[0m\n", "\u001b[32m2022-12-27 10:53:26.564\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:51\u001b[0m\n", "\u001b[32m2022-12-27 10:53:27.928\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:52\u001b[0m\n", "\u001b[32m2022-12-27 10:53:28.953\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:53\u001b[0m\n", "\u001b[32m2022-12-27 10:53:30.031\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:54\u001b[0m\n", "\u001b[32m2022-12-27 10:53:31.163\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:55\u001b[0m\n", "\u001b[32m2022-12-27 10:53:32.275\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:56\u001b[0m\n", "\u001b[32m2022-12-27 10:53:33.393\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:58\u001b[0m\n", "\u001b[32m2022-12-27 10:53:34.495\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:00:59\u001b[0m\n", "\u001b[32m2022-12-27 10:53:35.601\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:00\u001b[0m\n", "\u001b[32m2022-12-27 10:53:36.694\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:01\u001b[0m\n", "\u001b[32m2022-12-27 10:53:37.813\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:02\u001b[0m\n", "\u001b[32m2022-12-27 10:53:38.924\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:03\u001b[0m\n", "\u001b[32m2022-12-27 10:53:40.041\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:04\u001b[0m\n", "\u001b[32m2022-12-27 10:53:41.127\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:05\u001b[0m\n", "\u001b[32m2022-12-27 10:53:42.253\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:06\u001b[0m\n", "\u001b[32m2022-12-27 10:53:43.335\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:07\u001b[0m\n", "\u001b[32m2022-12-27 10:53:44.421\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:09\u001b[0m\n", "\u001b[32m2022-12-27 10:53:45.519\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:10\u001b[0m\n", "\u001b[32m2022-12-27 10:53:46.607\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:11\u001b[0m\n", "\u001b[32m2022-12-27 10:53:47.737\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:12\u001b[0m\n", "\u001b[32m2022-12-27 10:53:48.822\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:13\u001b[0m\n", "\u001b[32m2022-12-27 10:53:49.924\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:14\u001b[0m\n", "\u001b[32m2022-12-27 10:53:51.003\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:15\u001b[0m\n", "\u001b[32m2022-12-27 10:53:52.108\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:16\u001b[0m\n", "\u001b[32m2022-12-27 10:53:53.171\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:17\u001b[0m\n", "\u001b[32m2022-12-27 10:53:54.271\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:18\u001b[0m\n", "\u001b[32m2022-12-27 10:53:55.325\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:19\u001b[0m\n", "\u001b[32m2022-12-27 10:53:56.423\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:21\u001b[0m\n", "\u001b[32m2022-12-27 10:53:57.519\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:22\u001b[0m\n", "\u001b[32m2022-12-27 10:53:58.600\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:23\u001b[0m\n", "\u001b[32m2022-12-27 10:53:59.659\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:24\u001b[0m\n", "\u001b[32m2022-12-27 10:54:00.743\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:25\u001b[0m\n", "\u001b[32m2022-12-27 10:54:01.833\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:26\u001b[0m\n", "\u001b[32m2022-12-27 10:54:02.946\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:27\u001b[0m\n", "\u001b[32m2022-12-27 10:54:04.034\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:28\u001b[0m\n", "\u001b[32m2022-12-27 10:54:05.112\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:29\u001b[0m\n", "\u001b[32m2022-12-27 10:54:06.230\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:30\u001b[0m\n", "\u001b[32m2022-12-27 10:54:07.290\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:31\u001b[0m\n", "\u001b[32m2022-12-27 10:54:08.417\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:33\u001b[0m\n", "\u001b[32m2022-12-27 10:54:09.600\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:34\u001b[0m\n", "\u001b[32m2022-12-27 10:54:10.699\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:35\u001b[0m\n", "\u001b[32m2022-12-27 10:54:11.811\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:36\u001b[0m\n", "\u001b[32m2022-12-27 10:54:12.931\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:37\u001b[0m\n", "\u001b[32m2022-12-27 10:54:14.383\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:39\u001b[0m\n", "\u001b[32m2022-12-27 10:54:15.464\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:40\u001b[0m\n", "\u001b[32m2022-12-27 10:54:16.560\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:41\u001b[0m\n", "\u001b[32m2022-12-27 10:54:17.628\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:42\u001b[0m\n", "\u001b[32m2022-12-27 10:54:18.701\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:43\u001b[0m\n", "\u001b[32m2022-12-27 10:54:19.728\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:44\u001b[0m\n", "\u001b[32m2022-12-27 10:54:20.762\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component nn_0, time elapse: 0:01:45\u001b[0m\n", "\u001b[0mm2022-12-27 10:54:22.827\u001b[0m | \u001b[1mINFO \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m102\u001b[0m - \u001b[1mJob is canceled, time elapse: 0:01:47\n" ] } ], "source": [ "from federatedml.nn.dataset.criteo_dataset import CriteoDataset\n", "import numpy as np\n", "import pandas as pd\n", "from pipeline.interface import Data, Model\n", "from pipeline.component import Reader, Evaluation, DataTransform\n", "from pipeline.backend.pipeline import PipeLine\n", "from pipeline.component import HomoNN\n", "from pipeline import fate_torch_hook\n", "from torch import nn\n", "import torch as t\n", "import os\n", "from pipeline.component.homo_nn import DatasetParam, TrainerParam \n", "\n", "fate_torch_hook(t)\n", "\n", "fate_project_path = os.path.abspath('../../../../')\n", "data_path = 'examples/data/criteo.csv'\n", "host = 10000\n", "guest = 9999\n", "arbiter = 10000\n", "pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)\n", "\n", "data = {\"name\": \"criteo\", \"namespace\": \"experiment\"}\n", "pipeline.bind_table(name=data['name'],\n", " namespace=data['namespace'], path=fate_project_path + '/' + data_path)\n", "\n", "# reader\n", "reader_0 = Reader(name=\"reader_0\")\n", "reader_0.get_party_instance(\n", " role='guest', party_id=guest).component_param(table=data)\n", "reader_0.get_party_instance(\n", " role='host', party_id=host).component_param(table=data)\n", "\n", "# compute model parameter for our CTR models\n", "ds = CriteoDataset()\n", "dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, ffm_linear_feas_dict, \\\n", " ffm_cross_feas, ffm_cross_feas_dict = ds.load(fate_project_path + '/' + data_path)\n", "\n", "model = t.nn.Sequential(\n", " t.nn.CustModel(module_name='fm_model', class_name='FMModel', dense_feas_dict=dense_feas_dict, sparse_feas_dict=sparse_feas_dict)\n", ")\n", "\n", "nn_component = HomoNN(name='nn_0',\n", " model=model, \n", " loss=t.nn.BCELoss(),\n", " optimizer=t.optim.Adam(\n", " model.parameters(), lr=0.001, weight_decay=0.001),\n", " dataset=DatasetParam(dataset_name='criteo_dataset'),\n", " trainer=TrainerParam(trainer_name='fedavg_trainer', epochs=1, batch_size=256, validation_freqs=1,\n", " data_loader_worker=6, shuffle=True),\n", " torch_seed=100 \n", " )\n", "\n", "pipeline.add_component(reader_0)\n", "pipeline.add_component(nn_component, data=Data(train_data=reader_0.output.data))\n", "pipeline.add_component(Evaluation(name='eval_0', eval_type='binary'), data=Data(data=nn_component.output.data))\n", "pipeline.compile()\n", "pipeline.fit()" ] } ], "metadata": { "kernelspec": { "display_name": "venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "d29574a2ab71ec988cdcd4d29c58400bd2037cad632b9528d973466f7fb6f853" } } }, "nbformat": 4, "nbformat_minor": 2 }