Broker Load - Apache Doris (2024)

Broker load is an asynchronous import method, and the supported data sources depend on the data sources supported by the Broker process.

Because the data in the Doris table is ordered, Broker load uses the doris cluster resources to sort the data when importing data. Complete massive historical data migration relative to Spark load, the Doris cluster resource usage is relatively large. , this method is used when the user does not have Spark computing resources. If there are Spark computing resources, it is recommended to use Spark load.

Users need to create Broker load import through MySQL protocol and import by viewing command to check the import result.

Applicable scene

  • The source data is in a storage system that the broker can access, such as HDFS.
  • The amount of data is at the level of tens to hundreds of GB.

Fundamental

After the user submits the import task, FE will generate the corresponding Plan and distribute the Plan to multiple BEs for execution according to the current number of BEs and file size, and each BE executes a part of the imported data.

BE pulls data from the broker during execution, and imports the data into the system after transforming the data. All BEs are imported, and FE ultimately decides whether the import is successful.

 +
| 1. user create broker load
v
+----+----+
| |
| FE |
| |
+----+----+
|
| 2. BE etl and load the data
+--------------------------+
| | |
+---v---+ +--v----+ +---v---+
| | | | | |
| BE | | BE | | BE |
| | | | | |
+---+-^-+ +---+-^-+ +--+-^--+
| | | | | |
| | | | | | 3. pull data from broker
+---v-+-+ +---v-+-+ +--v-+--+
| | | | | |
|Broker | |Broker | |Broker |
| | | | | |
+---+-^-+ +---+-^-+ +---+-^-+
| | | | | |
+---v-+-----------v-+----------v-+-+
| HDFS/BOS/AFS cluster |
| |
+----------------------------------+

start import

Let's look at Broker Load through several actual scenario examples. use

Data import of Hive partition table

  1. Create Hive table
##Data format is: default, partition field is: day
CREATE TABLE `ods_demo_detail`(
`id` string,
`store_id` string,
`company_id` string,
`tower_id` string,
`commodity_id` string,
`commodity_name` string,
`commodity_price` double,
`member_price` double,
`cost_price` double,
`unit` string,
`quantity` double,
`actual_price` double
)
PARTITIONED BY (day string)
row format delimited fields terminated by ','
lines terminated by '\n'

Then use Hive's Load command to import your data into the Hive table

load data local inpath '/opt/custorm' into table ods_demo_detail;
  1. Create a Doris table, refer to the specific table syntax: CREATE TABLE
CREATE TABLE `doris_ods_test_detail` (
`rq` date NULL,
`id` varchar(32) NOT NULL,
`store_id` varchar(32) NULL,
`company_id` varchar(32) NULL,
`tower_id` varchar(32) NULL,
`commodity_id` varchar(32) NULL,
`commodity_name` varchar(500) NULL,
`commodity_price` decimal(10, 2) NULL,
`member_price` decimal(10, 2) NULL,
`cost_price` decimal(10, 2) NULL,
`unit` varchar(50) NULL,
`quantity` int(11) NULL,
`actual_price` decimal(10, 2) NULL
) ENGINE=OLAP
UNIQUE KEY(`rq`, `id`, `store_id`)
PARTITION BY RANGE(`rq`)
(
PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "MONTH",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "2",
"dynamic_partition.prefix" = "P_",
"dynamic_partition.buckets" = "1",
"in_memory" = "false",
"storage_format" = "V2"
);
  1. Start importing data

    Specific syntax reference: Broker Load

LOAD LABEL broker_load_2022_03_23
(
DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
INTO TABLE doris_ods_test_detail
COLUMNS TERMINATED BY ","
(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
COLUMNS FROM PATH AS (`day`)
SET
(rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
)
WITH BROKER "broker_name_1"
(
"username" = "hdfs",
"password" = ""
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);

Hive partition table import (ORC format)

  1. Create Hive partition table, ORC format
#Data format: ORC partition: day
CREATE TABLE `ods_demo_orc_detail`(
`id` string,
`store_id` string,
`company_id` string,
`tower_id` string,
`commodity_id` string,
`commodity_name` string,
`commodity_price` double,
`member_price` double,
`cost_price` double,
`unit` string,
`quantity` double,
`actual_price` double
)
PARTITIONED BY (day string)
row format delimited fields terminated by ','
lines terminated by '\n'
STORED AS ORC
  1. Create a Doris table. The table creation statement here is the same as the Doris table creation statement above. Please refer to the above .

  2. Import data using Broker Load

    LOAD LABEL dish_2022_03_23
    (
    DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
    INTO TABLE doris_ods_test_detail
    COLUMNS TERMINATED BY ","
    FORMAT AS "orc"
    (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    COLUMNS FROM PATH AS (`day`)
    SET
    (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price =member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
    )
    WITH BROKER "broker_name_1"
    (
    "username" = "hdfs",
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );

    Notice:

    • FORMAT AS "orc" : here we specify the data format to import
    • SET : Here we define the field mapping relationship between the Hive table and the Doris table and some operations for field conversion

HDFS file system data import

Let's continue to take the Doris table created above as an example to demonstrate importing data from HDFS through Broker Load.

The statement to import the job is as follows:

LOAD LABEL demo.label_20220402
(
DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")
INTO TABLE `ods_dish_detail_test`
COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
)
with HDFS (
"fs.defaultFS"="hdfs://10.220.147.151:8020",
"hadoop.username"="root"
)
PROPERTIES
(
"timeout"="1200",
"max_filter_ratio"="0.1"
);

The specific parameters here can refer to: Broker and [Broker Load](../../../sql-manual/sql-reference-v2 /Data-Manipulation-Statements/Load/BROKER-LOAD) documentation

View import status

We can view the status information of the above import task through the following command,

The specific syntax reference for viewing the import status SHOW LOAD

mysql> show load order by createtime desc limit 1\G;
**************************** 1. row ******************** ******
JobId: 41326624
Label: broker_load_2022_03_23
State: FINISHED
Progress: ETL: 100%; LOAD: 100%
Type: BROKER
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
ErrorMsg: NULL
CreateTime: 2022-04-01 18:59:06
EtlStartTime: 2022-04-01 18:59:11
EtlFinishTime: 2022-04-01 18:59:11
LoadStartTime: 2022-04-01 18:59:11
LoadFinishTime: 2022-04-01 18:59:11
URL: NULL
JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber ":1,"FileSize":5540}
1 row in set (0.01 sec)

Cancel import

When the broker load job status is not CANCELLED or FINISHED, it can be manually canceled by the user. When canceling, you need to specify the Label of the import task to be canceled. Cancel the import command syntax to execute CANCEL LOAD view.

For example: cancel the import job with the label broker_load_2022_03_23 on the database demo

CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Relevant system configuration

Broker parameters

Broker Load needs to use the Broker process to access remote storage. Different brokers need to provide different parameters. For details, please refer to Broker documentation.

FE configuration

The following configurations belong to the system-level configuration of Broker load, that is, the configurations that apply to all Broker load import tasks. The configuration values are adjusted mainly by modifying fe.conf.

  • min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency

    The first two configurations limit the minimum and maximum amount of data processed by a single BE. The third configuration limits the maximum number of concurrent imports for a job. The minimum amount of data processed, the maximum number of concurrency, the size of the source file and the number of BEs in the current cluster together determine the number of concurrent imports.

    The number of concurrent imports this time = Math.min (source file size/minimum processing capacity, maximum concurrent number, current number of BE nodes)
    The processing volume of a single BE imported this time = the size of the source file / the number of concurrent imports this time

    Usually the maximum amount of data supported by an import job is max_bytes_per_broker_scanner * number of BE nodes. If you need to import a larger amount of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately.

    default allocation:

    Parameter name: min_bytes_per_broker_scanner, the default is 64MB, the unit is bytes.
    Parameter name: max_broker_concurrency, default 10.
    Parameter name: max_bytes_per_broker_scanner, the default is 500G, the unit is bytes.

Best Practices

Application scenarios

The most suitable scenario for using Broker load is the scenario where the original data is in the file system (HDFS, BOS, AFS). Secondly, since Broker load is the only way of asynchronous import in a single import, if users need to use asynchronous access when importing large files, they can also consider using Broker load.

The amount of data

Only the case of a single BE is discussed here. If the user cluster has multiple BEs, the amount of data in the title below should be multiplied by the number of BEs. For example: if the user has 3 BEs, the value below 3G (inclusive) should be multiplied by 3, that is, below 9G (inclusive).

  • Below 3G (included)

    Users can directly submit Broker load to create import requests.

  • Above 3G

    Since the maximum processing capacity of a single import BE is 3G, the import of files exceeding 3G needs to be adjusted by adjusting the import parameters of Broker load to realize the import of large files.

    1. Modify the maximum scan amount and maximum concurrent number of a single BE according to the current number of BEs and the size of the original file.

      Modify the configuration in fe.conf
      max_broker_concurrency = number of BEs
      The amount of data processed by a single BE of the current import task = original file size / max_broker_concurrency
      max_bytes_per_broker_scanner >= the amount of data processed by a single BE of the current import task

      For example, for a 100G file, the number of BEs in the cluster is 10
      max_broker_concurrency = 10
      # >= 10G = 100G / 10
      max_bytes_per_broker_scanner = 1069547520

      After modification, all BEs will process the import task concurrently, each BE processing part of the original file.

      Note: The configurations in the above two FEs are all system configurations, that is to say, their modifications are applied to all Broker load tasks.

    2. Customize the timeout time of the current import task when creating an import

      The amount of data processed by a single BE of the current import task / the slowest import speed of the user Doris cluster (MB/s) >= the timeout time of the current import task >= the amount of data processed by a single BE of the current import task / 10M/s

      For example, for a 100G file, the number of BEs in the cluster is 10
      # >= 1000s = 10G / 10M/s
      timeout = 1000
    3. When the user finds that the timeout time calculated in the second step exceeds the default import timeout time of 4 hours

      At this time, it is not recommended for users to directly increase the maximum import timeout to solve the problem. If the single import time exceeds the default import maximum timeout time of 4 hours, it is best to divide the files to be imported and import them in multiple times to solve the problem. The main reason is: if a single import exceeds 4 hours, the time cost of retrying after the import fails is very high.

      The expected maximum import file data volume of the Doris cluster can be calculated by the following formula:

      Expected maximum import file data volume = 14400s * 10M/s * number of BEs
      For example: the number of BEs in the cluster is 10
      Expected maximum import file data volume = 14400s * 10M/s * 10 = 1440000M ≈ 1440G

      Note: The average user's environment may not reach the speed of 10M/s, so it is recommended that files over 500G be divided and imported.

Job scheduling

The system limits the number of running Broker Load jobs in a cluster to prevent too many Load jobs from running at the same time.

First, the configuration parameter of FE: desired_max_waiting_jobs will limit the number of Broker Load jobs that have not started or are running (job status is PENDING or LOADING) in a cluster. Default is 100. If this threshold is exceeded, newly submitted jobs will be rejected outright.

A Broker Load job is divided into pending task and loading task phases. Among them, the pending task is responsible for obtaining the information of the imported file, and the loading task will be sent to the BE to execute the specific import task.

The FE configuration parameter async_pending_load_task_pool_size is used to limit the number of pending tasks running at the same time. It is also equivalent to controlling the number of import tasks that are actually running. This parameter defaults to 10. That is to say, assuming that the user submits 100 Load jobs, at the same time only 10 jobs will enter the LOADING state and start execution, while other jobs are in the PENDING waiting state.

The configuration parameter async_loading_load_task_pool_size of FE is used to limit the number of tasks of loading tasks running at the same time. A Broker Load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). So async_loading_load_task_pool_size should be greater than or equal to async_pending_load_task_pool_size.

Performance Analysis

Session variables can be enabled by executing set enable_profile=true before submitting the LOAD job. Then submit the import job. After the import job is completed, you can view the profile of the import job in the Queris tab of the FE web page.

You can check the SHOW LOAD PROFILE help document for more usage help information.

This Profile can help analyze the running status of import jobs.

Currently the Profile can only be viewed after the job has been successfully executed

common problem

  • Import error: Scan bytes per broker scanner exceed limit:xxx

    Please refer to the Best Practices section in the document to modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency

  • Import error: failed to send batch or TabletWriter add batch with unknown id

    Modify query_timeout and streaming_load_rpc_max_alive_time_sec appropriately.

    streaming_load_rpc_max_alive_time_sec:

    During the import process, Doris will open a Writer for each Tablet to receive data and write. This parameter specifies the Writer's wait timeout. If the Writer does not receive any data within this time, the Writer will be automatically destroyed. When the system processing speed is slow, the Writer may not receive the next batch of data for a long time, resulting in an import error: TabletWriter add batch with unknown id. At this time, this configuration can be appropriately increased. Default is 600 seconds

  • Import error: LOAD_RUN_FAIL; msg:Invalid Column Name:xxx

    If it is data in PARQUET or ORC format, the column name of the file header needs to be consistent with the column name in the doris table, such as:

    (tmp_c1,tmp_c2)
    SET
    (
    id=tmp_c2,
    name=tmp_c1
    )

    Represents getting the column with (tmp_c1, tmp_c2) as the column name in parquet or orc, which is mapped to the (id, name) column in the doris table. If set is not set, the column in column is used as the map.

    Note: If you use the orc file directly generated by some hive versions, the header in the orc file is not hive meta data, but (_col0, _col1, _col2, ...), which may cause Invalid Column Name error, then you need to use set to map

  • Import error: Failed to get S3 FileSystem for bucket is null/empty

    1. The bucket is incorrect or does not exist.
    2. The bucket format is not supported. When creating a bucket name with _ on GCS, like s3://gs_bucket/load_tbl, the S3 Client will report an error. It is recommended not to use _ on GCS.

more help

For more detailed syntax and best practices used by Broker Load, see Broker Load command manual, you can also enter HELP BROKER LOAD in the MySql client command line for more help information.

I have a deep understanding of Broker load, an asynchronous import method, and related concepts. I can provide insights into the evidence presented in the article and elaborate on each aspect discussed.

Broker Load Overview: Broker load is an asynchronous import method that relies on the Broker process. It supports data import from various sources, depending on the data sources supported by the Broker process. The Doris table's ordered nature leads to the utilization of Doris cluster resources for sorting data during import. This method is particularly useful when users lack Spark computing resources.

Key Concepts:

  1. Doris Cluster Resource Usage:

    • Broker load is resource-intensive for Doris clusters compared to Spark load, especially for massive historical data migration.
    • Usage is recommended when Spark computing resources are unavailable.
  2. Import Process:

    • Users create Broker load imports through MySQL protocol and check results using viewing commands.
    • Frontend (FE) generates a Plan distributed to multiple Backend (BE) nodes for execution.
    • BEs pull data from the Broker, transform it, and import it into the system.
  3. Import Scenarios:

    • Suitable for sources accessible by Broker (e.g., HDFS).
    • Effective for data ranging from tens to hundreds of gigabytes.
  4. Hive Partition Table Import:

    • Example includes creating a Hive table and importing data into it using Hive's Load command.
  5. Doris Table Creation:

    • Syntax provided for creating a Doris table with specific properties.
  6. Broker Load Syntax:

    • Detailed syntax for initiating Broker load with specific parameters and properties.
    • Examples cover importing from Hive partition tables (ORC format) and HDFS file systems.
  7. Import Status and Cancellation:

    • Commands to view import status (SHOW LOAD) and cancel an ongoing import.
    • Example includes viewing import details and canceling a specific import task.
  8. System Configuration:

    • Broker and FE configurations, including Broker parameters and FE settings.
    • Best practices for adjusting configurations based on data size and cluster characteristics.
  9. Job Scheduling and Performance Analysis:

    • Limitations on running Broker Load jobs in a cluster.
    • Performance analysis using session variables and LOAD job profiling.
  10. Common Issues and Troubleshooting:

    • Addressing common import errors and providing solutions, such as modifying FE configurations.
  11. Best Practices:

    • Recommendations for optimal application scenarios, data amounts, and job scheduling.
    • Adjustments to FE configurations for handling large files and managing import timeouts.

This comprehensive overview showcases a detailed understanding of Broker load, covering its syntax, scenarios, configurations, and troubleshooting aspects. If you have any specific questions or need further clarification on any topic, feel free to ask.

Broker Load - Apache Doris (2024)
Top Articles
Latest Posts
Article information

Author: Melvina Ondricka

Last Updated:

Views: 6200

Rating: 4.8 / 5 (48 voted)

Reviews: 95% of readers found this page helpful

Author information

Name: Melvina Ondricka

Birthday: 2000-12-23

Address: Suite 382 139 Shaniqua Locks, Paulaborough, UT 90498

Phone: +636383657021

Job: Dynamic Government Specialist

Hobby: Kite flying, Watching movies, Knitting, Model building, Reading, Wood carving, Paintball

Introduction: My name is Melvina Ondricka, I am a helpful, fancy, friendly, innocent, outstanding, courageous, thoughtful person who loves writing and wants to share my knowledge and understanding with you.