# Federated classic CTR model training on Criteo

In this tutorial, we show you how to develop a horizontal federated recommendation model. We use the third-party library torch-rechub to call some classic recommendation models such as FM, DeepFM, etc., and use it to build a federated task in FATE. In terms of the dataset, we use the Criteo dataset which has been sampled and preprocessed, with a total of 50K data. For convenience, in this tutorial, all clients use the same dataset.

## Install Torch-rechub

We will use torch-rechub from:https://github.com/datawhalechina/torch-rechub.
To run this tutorial, you need to install the latest-version:

- git clone https://github.com/datawhalechina/torch-rechub.git
- cd torch-rechub
- python setup.py install

## The Criteo
The Criteo dataset is an online advertising dataset released by Criteo Labs. It contains millions of records of click feedback for displayed ads, which can be used as a benchmark for click-through rate (CTR) prediction. The dataset has 40 features, with the first column being the label, where a value of 1 indicates that the ad was clicked and a value of 0 indicates that the ad was not clicked. The other features include 13 dense features and 26 sparse features.

- Dowload sampled data used in this tutorial here:
- 
 https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/examples/data/criteo.csv
 
 And then put the csv under /examples/data.

- The origin source of the criteo dataset is: https://ailab.criteo.com/ressources/

### Construct dataset class

Here we will construct a Criteo dataset that read the csv. At the same time, it will process the dataset and return 
information about dense&sparse features.

In [1]:
from pipeline.component.nn import save_to_fate 

In [1]:
%%save_to_fate dataset criteo_dataset.py
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from tqdm import tqdm
import pandas as pd
import numpy as np
from federatedml.nn.dataset.base import Dataset
from torch_rechub.basic.features import DenseFeature, SparseFeature

class CriteoDataset(Dataset):
 
 def __init__(self):
 # super().__init__()
 super(CriteoDataset, self).__init__()
 self.x = None
 self.y = None
 self.sample_ids= None
 
 # load dataset and process
 def load(self, data_path):
 
 # convert numeric features
 def convert_numeric_feature(val):
 v = int(val)
 if v > 2:
 return int(np.log(v)**2)
 else:
 return v - 2
 # load data
 if data_path.endswith(".gz"): #if the raw_data is gz file:
 data = pd.read_csv(data_path, compression="gzip")
 else:
 data = pd.read_csv(data_path)
 print("data load finished")
 # print(data.info())
 if 'sid' in data.columns:
 self.sample_ids = list(data.sid)
 
 # feature_process
 dense_features = [f for f in data.columns.tolist() if f[0] == "I"]
 sparse_features = [f for f in data.columns.tolist() if f[0] == "C"]

 data[sparse_features] = data[sparse_features].fillna('0')
 data[dense_features] = data[dense_features].fillna(0)
 
 for feat in tqdm(dense_features): #discretize dense feature and as new sparse feature
 sparse_features.append(feat + "_cat")
 data[feat + "_cat"] = data[feat].apply(lambda x: convert_numeric_feature(x))

 sca = MinMaxScaler() #scaler dense feature
 data[dense_features] = sca.fit_transform(data[dense_features])

 for feat in tqdm(sparse_features): #encode sparse feature
 lbe = LabelEncoder()
 data[feat] = lbe.fit_transform(data[feat])

 def get_dense_feat(name):
 return DenseFeature(name), {'name': name}
 dense_feas = []
 dense_feas_dict = []
 for feature_name in dense_features:
 a,b = get_dense_feat(feature_name)
 dense_feas.append(a)
 dense_feas_dict.append(b)

 def get_sparse_feat(name, vocab_size,embed_dim):
 return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}

 sparse_feas=[]
 sparse_feas_dict=[]
 for feature_name in sparse_features:
 a,b = get_sparse_feat(name=feature_name, vocab_size=data[feature_name].nunique(), embed_dim=16)
 sparse_feas.append(a)
 sparse_feas_dict.append(b)

 ffm_linear_feas = []
 ffm_linear_feas_dict = []
 def get_ffm_linear_feat(name, vocab_size,embed_dim):
 return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}
 for feature in sparse_feas:
 a,b = get_ffm_linear_feat(name = feature.name, vocab_size=feature.vocab_size, embed_dim=1)
 ffm_linear_feas.append(a)
 ffm_linear_feas_dict.append(b)

 ffm_cross_feas = []
 ffm_cross_feas_dict = []
 def get_ffm_cross_feat(name, vocab_size,embed_dim):
 return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}
 for feature in sparse_feas:
 a,b = get_ffm_cross_feat(name = feature.name, vocab_size=feature.vocab_size*len(sparse_feas), embed_dim=10)
 ffm_cross_feas.append(a)
 ffm_cross_feas_dict.append(b)
 
 y = data["label"]
 del data["label"]
 x = data
 
 # ncq:
 self.x = x
 self.y = y
 
 return dense_feas, dense_feas_dict,sparse_feas,sparse_feas_dict, ffm_linear_feas,ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict

 def __getitem__(self, index):
 return {k: v[index] for k, v in self.x.items()}, self.y[index].astype(np.float32)

 def __len__(self):
 return len(self.y)

 def get_sample_ids(self,):
 return self.sample_ids

### CTR Models
Here based on Torch-rechub, we develop four kinds of CTR models: FM, FFM, DeepFM and DeepFFM

In [3]:
%%save_to_fate model fm_model.py
import torch
from torch_rechub.basic.layers import FM,LR,EmbeddingLayer
from torch_rechub.basic.features import DenseFeature, SparseFeature
class FMModel(torch.nn.Module):
 '''
 A standard FM models
 '''
 def __init__(self, dense_feas_dict, sparse_feas_dict):
 super(FMModel, self).__init__()
 dense_features = []
 def recover_dense_feat(dict_):
 return DenseFeature(name=dict_['name'])
 
 for i in dense_feas_dict:
 dense_features.append(recover_dense_feat(i))
 self.dense_features = dense_features
 
 sparse_features = []
 def recover_sparse_feat(dict_):
 return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
 
 for i in sparse_feas_dict:
 sparse_features.append(recover_sparse_feat(i))
 self.sparse_features = sparse_features
 
 self.fm_features = self.sparse_features
 self.fm_dims = sum([fea.embed_dim for fea in self.fm_features])
 self.linear = LR(self.fm_dims) # 1-odrder interaction
 self.fm = FM(reduce_sum=True) # 2-odrder interaction
 self.embedding = EmbeddingLayer(self.fm_features)

 def forward(self, x):
 input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]input_fm: torch.Size([100, 39, 16])
 # print('input_fm:',input_fm.shape) # input_fm: torch.Size([100, 39, 16]) (batch_size, num_features, embed_dim)
 y_linear = self.linear(input_fm.flatten(start_dim=1))
 y_fm = self.fm(input_fm)
 # print('y_fm.shape:',y_fm.shape) # y_fm.shape: torch.Size([100, 1])

 y = y_linear + y_fm
 return torch.sigmoid(y.squeeze(1))

In [4]:
%%save_to_fate model deepfm_model.py
import torch
from torch_rechub.basic.layers import FM,LR,EmbeddingLayer,MLP
from torch_rechub.basic.features import DenseFeature, SparseFeature

class DeepFMModel(torch.nn.Module):
 """Deep Factorization Machine Model
 Args:
 deep_features (list): the list of `Feature Class`, training by the deep part module.
 fm_features (list): the list of `Feature Class`, training by the fm part module.
 mlp_params (dict): the params of the last MLP module, keys include:`{"dims":list, "activation":str, "dropout":float, "output_layer":bool`}
 """

 def __init__(self, dense_feas_dict, sparse_feas_dict, mlp_params):
 super(DeepFMModel, self).__init__()
 dense_features = []
 def recover_dense_feat(dict_):
 return DenseFeature(name=dict_['name'])
 
 for i in dense_feas_dict:
 dense_features.append(recover_dense_feat(i))
 self.dense_features = dense_features
 
 sparse_features = []
 def recover_sparse_feat(dict_):
 return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
 
 for i in sparse_feas_dict:
 sparse_features.append(recover_sparse_feat(i))
 self.deep_features = dense_features
 self.fm_features = sparse_features
 
 self.deep_dims = sum([fea.embed_dim for fea in dense_features])
 self.fm_dims = sum([fea.embed_dim for fea in sparse_features])
 self.linear = LR(self.fm_dims) # 1-odrder interaction
 self.fm = FM(reduce_sum=True) # 2-odrder interaction
 self.embedding = EmbeddingLayer(dense_features + sparse_features)
 self.mlp = MLP(self.deep_dims, **mlp_params)

 def forward(self, x):
 input_deep = self.embedding(x, self.deep_features, squeeze_dim=True) #[batch_size, deep_dims]
 input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]

 y_linear = self.linear(input_fm.flatten(start_dim=1))
 y_fm = self.fm(input_fm)
 y_deep = self.mlp(input_deep) #[batch_size, 1]
 y = y_linear + y_fm + y_deep
 return torch.sigmoid(y.squeeze(1))

In [5]:
%%save_to_fate model ffm_model.py
import torch
from torch_rechub.basic.layers import LR,EmbeddingLayer,FFM,FM
import torch
from torch_rechub.basic.features import DenseFeature, SparseFeature

class FFMModel(torch.nn.Module):
 
 def __init__(self, liner_feas_dict, cross_feas_dict):
 super(FFMModel, self).__init__()
 linear_features = []
 def recover_ffm_linear_feat(dict_):
 return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])

 for i in liner_feas_dict:
 linear_features.append(recover_ffm_linear_feat(i))

 cross_features = []
 def recover_ffm_cross_feat(dict_):
 return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])

 for i in cross_feas_dict:
 cross_features.append(recover_ffm_cross_feat(i)) 
 self.linear_features = linear_features
 self.cross_features = cross_features 
 self.num_fields = len(cross_features)
 self.num_field_cross = self.num_fields * (self.num_fields - 1) // 2
 # print('num_fields:',self.num_fields) #39
 self.ffm = FFM(num_fields=self.num_fields, reduce_sum=False) 
 self.linear_embedding = EmbeddingLayer(linear_features)
 self.ffm_embedding = EmbeddingLayer(cross_features)
 self.b =torch.nn.Parameter(torch.zeros(1))
 fields_offset = torch.arange(0, self.num_fields, dtype=torch.long)
 self.register_buffer('fields_offset', fields_offset)
 self.fm = FM(reduce_sum=True)

 def forward(self, x):
 y_linear = self.linear_embedding(x, self.linear_features, squeeze_dim=True).sum(1, keepdim=True) #[batch_size, 1] 
 # output shape [batch_size, num_field, num_field, emb_dim]
 x_ffm = {fea.name: x[fea.name].unsqueeze(1) * self.num_fields + self.fields_offset for fea in self.cross_features} 
 input_ffm = self.ffm_embedding(x_ffm, self.cross_features, squeeze_dim=False) 
 # print('input_ffm.shape:',input_ffm.shape) # torch.Size([100, 39, 39, 10])
 em = self.ffm(input_ffm) 
 # print('output_ffm.shape:',em.shape) 
 # torch.Size([100, 741, 10])(batch_size, num_fields*(num_fields-1)/2,embed_dim) 
 y_ffm = self.fm(em)
 # compute scores from the ffm part of the model, output shape [batch_size, 1]
 # print('y_linear.shape:',y_linear.shape) # torch.Size([100,1])
 y = y_linear + y_ffm
 return torch.sigmoid(y.squeeze(1)+ self.b)

In [6]:
%%save_to_fate model deepffm_model.py
import torch 
from torch import nn
from torch_rechub.basic.features import SparseFeature
from torch_rechub.models.ranking import DeepFFM

class DeepFFMModel(nn.Module):
 def __init__(self,ffm_linear_feas_dict, ffm_cross_feas_dict):
 super().__init__()
 linear_features = []
 cross_features = []
 
 def recover_sparse_feat(dict_):
 return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
 
 for i in ffm_linear_feas_dict:
 linear_features.append(recover_sparse_feat(i))
 for i in ffm_cross_feas_dict:
 cross_features.append(recover_sparse_feat(i))
 self.model = DeepFFM(linear_features=linear_features, cross_features=cross_features, embed_dim=10, mlp_params={"dims": [1600, 1600], "dropout": 0.5, "activation": "relu"})
 
 def forward(self, x):
 return self.model(x)
 

## Local Testing
Test this set (dataset + model) locally to see if it can run, and omit the federated aggregation part during local validation.

In [7]:
import torch
from federatedml.nn.homo.trainer.fedavg_trainer import FedAVGTrainer

ds = CriteoDataset()
path = '../../../../examples/data/criteo.csv'

dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, \
 ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict = ds.load(path)

fm_model = FMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict)

ffm_model = FFMModel(liner_feas_dict=ffm_linear_feas_dict, cross_feas_dict=ffm_cross_feas_dict)

dfm_model = DeepFMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict, mlp_params={"dims": [256, 128], "dropout": 0.2, "activation": "relu"})

dffm_model = DeepFFMModel(ffm_linear_feas_dict=ffm_linear_feas_dict,ffm_cross_feas_dict=ffm_cross_feas_dict)

model = fm_model

ipcl_python failed to import
ipcl_python failed to import


data load finished


100%|██████████| 26/26 [00:13<00:00, 1.87it/s]
100%|██████████| 52/52 [00:01<00:00, 40.50it/s]


In [8]:
# here we test one model: the FM Model
trainer = FedAVGTrainer(epochs=1, batch_size=256)
trainer.local_mode()
trainer.set_model(model)
trainer.train(ds, None, optimizer=torch.optim.Adam(model.parameters(), lr=0.01), loss=torch.nn.BCELoss())

epoch is 0
100%|██████████| 1954/1954 [11:24<00:00, 2.86it/s]
epoch loss is 1.3575431838684082


## Submit a Pipeline to run a federation task

In [7]:
from federatedml.nn.dataset.criteo_dataset import CriteoDataset
import numpy as np
import pandas as pd
from pipeline.interface import Data, Model
from pipeline.component import Reader, Evaluation, DataTransform
from pipeline.backend.pipeline import PipeLine
from pipeline.component import HomoNN
from pipeline import fate_torch_hook
from torch import nn
import torch as t
import os
from pipeline.component.homo_nn import DatasetParam, TrainerParam 

fate_torch_hook(t)

fate_project_path = os.path.abspath('../../../../')
data_path = 'examples/data/criteo.csv'
host = 10000
guest = 9999
arbiter = 10000
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)

data = {"name": "criteo", "namespace": "experiment"}
pipeline.bind_table(name=data['name'],
 namespace=data['namespace'], path=fate_project_path + '/' + data_path)

# reader
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(
 role='guest', party_id=guest).component_param(table=data)
reader_0.get_party_instance(
 role='host', party_id=host).component_param(table=data)

# compute model parameter for our CTR models
ds = CriteoDataset()
dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, ffm_linear_feas_dict, \
 ffm_cross_feas, ffm_cross_feas_dict = ds.load(fate_project_path + '/' + data_path)

model = t.nn.Sequential(
 t.nn.CustModel(module_name='fm_model', class_name='FMModel', dense_feas_dict=dense_feas_dict, sparse_feas_dict=sparse_feas_dict)
)

nn_component = HomoNN(name='nn_0',
 model=model, 
 loss=t.nn.BCELoss(),
 optimizer=t.optim.Adam(
 model.parameters(), lr=0.001, weight_decay=0.001),
 dataset=DatasetParam(dataset_name='criteo_dataset'),
 trainer=TrainerParam(trainer_name='fedavg_trainer', epochs=1, batch_size=256, validation_freqs=1,
 data_loader_worker=6, shuffle=True),
 torch_seed=100 
 )

pipeline.add_component(reader_0)
pipeline.add_component(nn_component, data=Data(train_data=reader_0.output.data))
pipeline.add_component(Evaluation(name='eval_0', eval_type='binary'), data=Data(data=nn_component.output.data))
pipeline.compile()
pipeline.fit()

data load finished


100%|██████████| 26/26 [00:13<00:00, 1.87it/s]
100%|██████████| 52/52 [00:01<00:00, 43.13it/s]
[32m2022-12-27 10:52:35.371[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m83[0m - [1mJob id is 202212271052347495390
[0m
[32m2022-12-27 10:52:35.523[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:00[0m
[32m2022-12-27 10:52:36.535[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m98[0m - [1m[80D[1A[KJob is still waiting, time elapse: 0:00:01[0m
[0mm2022-12-27 10:52:38.601[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m125[0m - [1m
[32m2022-12-27 10:52:38.603[0m | [1mINFO [0m | [36mpipeline.utils.invoker.job_submitter[0m:[36mmonitor_job_status[0m:[36m127[0m - [1m[80D[1A[KRunning component re