fate_flow_job_scheduling.md 20 KB

Multi-Party Job&Task Scheduling

1. Description

Mainly describes how to submit a federated learning job using FATE Flow and observe the use of

2. Job submission

  • Build a federated learning job and submit it to the scheduling system for execution
  • Two configuration files are required: job dsl and job conf
  • job dsl configures the running components: list, input-output relationships
  • job conf configures the component execution parameters, system operation parameters

{{snippet('cli/job.md', '### submit')}}

3. Job DSL configuration description

The configuration file of DSL is in json format, in fact, the whole configuration file is a json object (dict).

3.1 Component List

Description The first level of this dict is components, which indicates the modules that will be used by this job. Example

{
  "components" : {
          ...
      }
}

Each individual module is defined under "components", e.g.

"data_transform_0": {
      "module": "DataTransform",
      "input": {
          "data": {
              "data": [
                  "reader_0.train_data"
              ]
          }
      },
      "output": {
          "data": ["train"],
          "model": ["model"]
      }
  }

All data needs to be fetched from the data store via the Reader module, note that this module only has the output output

"reader_0": {
      "module": "Reader",
      "output": {
          "data": ["train"]
      }
}

3.2 Modules

Description Used to specify the components to be used, all optional module names refer to. Example

"hetero_feature_binning_1": {
    "module": "HeteroFeatureBinning",
     ...
}

3.3 Inputs

Implications Upstream inputs, divided into two input types, data and model.

data input

Description Upstream data input, divided into three input types.

> 1. data: generally used in the data-transform module, feature_engineering module or
> evaluation module.
> 2. train_data: Generally used in homo_lr, hetero_lr and secure_boost
> modules. If the train_data field is present, then the task will be recognized as a fit task
> validate_data: If the train_data
> field is present, then the field is optional. If you choose to keep this field, the data pointed to will be used as the
> validation set
> 4. test_data: Used as prediction data, if provided, along with model input.

model_input

Description Upstream model input, divided into two input types.

1. model: Used for model input of the same type of component. For example, hetero_binning_0 will fit the model, and then
    hetero_binning_1 will use the output of hetero_binning_0 for predict or
    transform. code example.
        "hetero_feature_binning_1": {
            "module": "HeteroFeatureBinning",
            "input": {
                "data": {
                    "data": [
                        "data_transform_1.validate_data"
                    ]
                },
                "model": [
                    "hetero_feature_binning_0.fit_model"
                ]
            },
            "output": {
                "data": ["validate_data" ],
              "model": ["eval_model"]
            }
        }
2. isometric_model: Used to specify the model input of the inherited upstream component. For example, the upstream component of feature selection is
    feature binning, it will use the information of feature binning as the feature
    Code example.
        "hetero_feature_selection_0": {
            "module": "HeteroFeatureSelection",
            "input": {
                "data": {
                    "data": [
                        "hetero_feature_binning_0.train"
                    ]
                },
                "isometric_model": [
                    "hetero_feature_binning_0.output_model"
                ]
            },
            "output": {
                "data": [ "train" ],
                "model": ["output_model"]
            }
        }

3.4 Output

Description Output, like input, is divided into data and model output

data output

Description Data output, divided into four output types.

  1. data: General module data output
  2. train_data: only for Data Split
  3. validate_data: Only for Data Split
  4. test_data: Data Split only

Model Output

Description Model output, using model only

3.5 Component Providers

Since FATE-Flow version 1.7.0, the same FATE-Flow system supports loading multiple component providers, i.e. providers, which provide several components, and the source provider of the component can be configured when submitting a job Since FATE-Flow version 1.9.0, the parameters of the provider need to be configured in job conf, as follows

Description Specify the provider, support global specification and individual component specification; if not specified, the default provider: fate@$FATE_VERSION

Format provider_name@$provider_version

Advanced You can register a new provider through the component registration CLI, currently supported providers: fate and fate_sql, please refer to FATE Flow Component Center

Example

{
  "dsl_version": "2",
  "initiator": {},
  "role": {},
  "job_parameters": {},
  "component_parameters": {},
  "provider": {
    "common": {
      "hetero_feature_binning_0": "fate@1.8.0"
    },
    "role": {
      "guest": {
        "0": {
          "data_transform_0": "fate@1.9.0"
        }
      },
      "host": {
        "0": {
          "data_transform_0": "fate@1.9.0"
        }
      }
    }
  }
}

4. Job Conf Configuration Description

Job Conf is used to set the information of each participant, the parameters of the job and the parameters of each component. The contents include the following.

4.1 DSL Version

Description Configure the version, the default is not 1, it is recommended to configure 2 Example

"dsl_version": "2"

4.2 Job participants

initiating party

Description The role and party_id of the assignment initiator. Example

"initiator": {
    "role": "guest",
    "party_id": 9999
}

All participants

Description Information about each participant. Description In the role field, each element represents a role and the party_id that assumes that role. party_id for each role

The party_id of each role is in the form of a list, since a task may involve multiple parties in the same role.

Example

"role": {
    "guest": [9999],
    "host": [10000],
    "arbiter": [10000]
}

4.3 System operation parameters

Description

Configure the main system parameters for job runtime

Parameter application scope policy setting

**Apply to all participants, use the common scope identifier **Apply to only one participant, use the role scope identifier, use (role:)party_index to locate the specified participant, direct

"common": {
}

"role": {
  "guest": {
    "0": {
    }
  }
}

The parameters under common are applied to all participants, and the parameters under role-guest-0 configuration are applied to the participants under the subscript 0 of the guest role. Note that the current version of the system operation parameters are not strictly tested for application to only one participant, so it is recommended to use common as a preference.

Supported system parameters

Configuration Default Supported Description
job_type train train, predict task_cores
task_cores 4 positive_integer total_cpu_cores_applied_to_job
task_parallelism 1 positive_integer task_parallelism
computing_partitions number of cpu cores allocated to task positive integer number of partitions in the data table at computation time
eggroll_run none processors_per_node, etc. eggroll computing engine related configuration parameters, generally do not need to be configured, from task_cores automatically calculated, if configured, task_cores parameters do not take effect
spark_run none num-executors, executor-cores, etc. spark compute engine related configuration parameters, generally do not need to be configured, automatically calculated by task_cores, if configured, task_cores parameters do not take effect
rabbitmq_run None queue, exchange, etc. Configuration parameters for rabbitmq to create queue, exchange, etc., which are generally not required and take the system defaults.
pulsar_run none producer, consumer, etc. The configuration parameters for pulsar to create producer and consumer.
federated_status_collect_type PUSH PUSH, PULL Multi-party run status collection mode, PUSH means that each participant actively reports to the initiator, PULL means that the initiator periodically pulls from each participant.
timeout 259200 (3 days) positive integer task_timeout,unit_second
audo_retries 3 positive integer maximum number of retries per task failure
model_id - - The model id to be filled in for prediction tasks.
model_version - - Model version, required for prediction tasks
  1. there is a certain support dependency between the computation engine and the storage engine
  2. developers can implement their own adapted engines, and configure the engines in runtime config

reference configuration

  1. no need to pay attention to the compute engine, take the system default cpu allocation compute policy when the configuration
"job_parameters": {
  "common": {
    "job_type": "train",
    "task_cores": 6,
    "task_parallelism": 2,
    "computing_partitions": 8,
    "timeout": 36000
  }
}
  1. use eggroll as the computing engine, take the configuration when specifying cpu and other parameters directly
"job_parameters": {
  "common": {
    "job_type": "train",
    "eggroll_run": {
      "eggroll.session.processors.per.node": 2
    },
    "task_parallelism": 2,
    "computing_partitions": 8,
    "timeout": 36000,
  }
}
  1. use spark as the computing engine, rabbitmq as the federation engine, take the configuration when specifying the cpu and other parameters directly
"job_parameters": {
  "common": {
    "job_type": "train",
    "spark_run": {
      "num-executors": 1,
      "executor-cores": 2
    },
    "task_parallelism": 2,
    "computing_partitions": 8,
    "timeout": 36000,
    "rabbitmq_run": {
      "queue": {
        "durable": true
      },
      "connection": {
        "heartbeat": 10000
      }
    }
  }
}
  1. use spark as the computing engine and pulsar as the federation engine
"job_parameters": {
  "common": {
    "spark_run": {
      "num-executors": 1,
      "executor-cores": 2
    },
  }
}

For more advanced resource-related configuration, please refer to Resource Management

4.3 Component operation parameters

Parameter application scope policy setting

  • Apply to all participants, use common scope identifier
  • Apply to only one participant, use the role scope identifier, use (role:)party_index to locate the specified participant, directly specified parameters have higher priority than common parameters
"commom": {
}

"role": {
  "guest": {
    "0": {
    }
  }
}

where the parameters under the common configuration are applied to all participants, and the parameters under the role-guest-0 configuration indicate that they are applied to the participants under the subscript 0 of the guest role Note that the current version of the component runtime parameter already supports two application scope policies

Reference Configuration

  • For the intersection_0 and hetero_lr_0 components, the runtime parameters are placed under the common scope and are applied to all participants
  • The operational parameters of reader_0 and data_transform_0 components are configured specific to each participant, because usually the input parameters are not consistent across participants, so usually these two components are set by participant
  • The above component names are defined in the DSL configuration file
"component_parameters": {
  "common": {
    "intersection_0": {
      "intersect_method": "raw",
      "sync_intersect_ids": true,
      "only_output_key": false
    },
    "hetero_lr_0": {
      "penalty": "L2",
      "optimizer": "rmsprop",
      "alpha": 0.01,
      "max_iter": 3,
      "batch_size": 320,
      "learning_rate": 0.15,
      "init_param": {
        "init_method": "random_uniform"
      }
    }
  },
  "role": {
    "guest": {
      "0": {
        "reader_0": {
          "table": {"name": "breast_hetero_guest", "namespace": "experiment"}
        },
        "data_transform_0":{
          "with_label": true,
          "label_name": "y",
          "label_type": "int",
          "output_format": "dense"
        }
      }
    },
    "host": {
      "0": {
        "reader_0": {
          "table": {"name": "breast_hetero_host", "namespace": "experiment"}
        },
        "data_transform_0":{
          "with_label": false,
          "output_format": "dense"
        }
      }
    }
  }
}

5. Multi-Host Configuration

Multi-Host task should list all host information under role

Example:

"role": {
   "guest": [
     10000
   ],
   "host": [
     10000, 10001, 10002
   ],
   "arbiter": [
     10000
   ]
}

The different configurations for each host should be listed separately under their respective corresponding modules

Example:

"component_parameters": {
   "role": {
      "host": {
         "0": {
            "reader_0": {
               "table":
                {
                  "name": "hetero_breast_host_0",
                  "namespace": "hetero_breast_host"
                }
            }
         },
         "1": {
            "reader_0": {
               "table":
               {
                  "name": "hetero_breast_host_1",
                  "namespace": "hetero_breast_host"
               }
            }
         },
         "2": {
            "reader_0": {
               "table":
               {
                  "name": "hetero_breast_host_2",
                  "namespace": "hetero_breast_host"
               }
            }
         }
      }
   }
}

6. Predictive Task Configuration

6.1 Description

DSL V2 does not automatically generate prediction dsl for the training task. Users need to deploy the modules in the required model using Flow Client first. For detailed command description, please refer to fate_flow_client

flow model deploy --model-id $model_id --model-version $model_version --cpn-list ...

Optionally, the user can add new modules to the prediction dsl, such as Evaluation

6.2 Sample

Training dsl.

"components": {
    "reader_0": {
        "module": "Reader",
        "output": {
            "data": [
                "data"
            ]
        }
    },
    "data_transform_0": {
        "module": "DataTransform",
        "input": {
            "data": {
                "data": [
                    "reader_0.data"
                ]
            }
        },
        "output": {
            "data": [
                "data"
            ],
            "model": [
                "model"
            ]
        }
    },
    "intersection_0": {
        "module": "Intersection",
        "input": {
            "data": {
                "data": [
                    "data_transform_0.data"
                ]
            }
        },
        "output": {
            "data":[
                "data"
            ]
        }
    },
    "hetero_nn_0": {
        "module": "HeteroNN",
        "input": {
            "data": {
                "train_data": [
                    "intersection_0.data"
                ]
            }
        },
        "output": {
            "data": [
                "data"
            ],
            "model": [
                "model"
            ]
        }
    }
}

Prediction dsl:

"components": {
    "reader_0": {
        "module": "Reader",
        "output": {
            "data": [
                "data"
            ]
        }
    },
    "data_transform_0": {
        "module": "DataTransform",
        "input": {
            "data": {
                "data": [
                    "reader_0.data"
                ]
            }
        },
        "output": {
            "data": [
                "data"
            ],
            "model": [
                "model"
            ]
        }
    },
    "intersection_0": {
        "module": "Intersection",
        "input": {
            "data": {
                "data": [
                    "data_transform_0.data"
                ]
            }
        },
        "output": {
            "data":[
                "data"
            ]
        }
    },
    "hetero_nn_0": {
        "module": "HeteroNN",
        "input": {
            "data": {
                "train_data": [
                    "intersection_0.data"
                ]
            }
        },
        "output": {
            "data": [
                "data"
            ],
            "model": [
                "model"
            ]
        }
    },
    "evaluation_0": {
        "module": "Evaluation",
        "input": {
            "data": {
                "data": [
                    "hetero_nn_0.data"
                ]
            }
         },
         "output": {
             "data": [
                 "data"
             ]
          }
    }
}

7. Job reruns

In 1.5.0, we started to support re-running a job, but only failed jobs are supported. Version 1.7.0 supports rerunning of successful jobs, and you can specify which component to rerun from, the specified component and its downstream components will be rerun, but other components will not be rerun

{{snippet('cli/job.md', '### rerun')}}

8. Job parameter update

In the actual production modeling process, it is necessary to constantly debug the component parameters and rerun, but not all components need to be adjusted and rerun at this time, so after 1.7.0 version support to modify a component parameter update, and with the rerun command on-demand rerun

{{snippet('cli/job.md', '### parameter-update')}}

9. Job scheduling policy

  • Queuing by commit time
  • Currently, only FIFO policy is supported, i.e. the scheduler will only scan the first job each time, if the first job is successful in requesting resources, it will start and get out of the queue, if the request fails, it will wait for the next round of scheduling.

10. dependency distribution

Brief description:

  • Support for distributing fate and python dependencies from client nodes;
  • The work node does not need to deploy fate;
  • Only fate on spark supports distribution mode in current version;

Related parameters configuration:

conf/service_conf.yaml:

dependent_distribution: true

fate_flow/settings.py

FATE_FLOW_UPDATE_CHECK = False

Description:

  • dependent_distribution: dependent distribution switch;, off by default; when off, you need to deploy fate on each work node, and also fill in the configuration of spark in spark-env.sh to configure PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON.

  • FATE_FLOW_UPDATE_CHECK: Dependency check switch, turned off by default; it will automatically check if the fate code has changed every time a task is submitted; if it has changed, the fate code dependency will be re-uploaded;

11. More commands

Please refer to Job CLI and Task CLI