Broker Load - E-MapReduce - Alibaba Cloud Documentation Center (2024)

Broker Load is an asynchronous import method, and the supported data sources depend on the data sources supported by brokers. This topic describes the basic principles, basic operations, system configurations, and best practices of Broker Load.

Background information

Data in a Doris table is ordered. If you import data in Broker Load mode to a Doris cluster, the resources of the cluster are used to sort the data. Compared with Spark Load, Broker Load uses more Doris cluster resources to migrate a large volume of historical data. You can import data in Broker Load mode if Spark computing resources are unavailable. If Spark computing resources are available, we recommend that you import data in Spark Load mode.

Scenarios

  • The source data is in a storage system that is accessible to brokers, such as Hadoop Distributed File System (HDFS).

  • The amount of data to be imported reaches tens or hundreds of GB.

Basic principles

After you submit an import job, the frontend (FE) generates a plan and distributes the plan to multiple backends (BEs) based on the number of BEs and the size of the source file. Each BE imports a part of the data to be imported. Each BE pulls data from a broker, transforms the data, and then imports the data into the system. After all BEs import data, the FE decides whether the import is successful.

+ | 1. A user creates an import job in Broker Load mode. v +----+----+ | | | FE | | | +----+----+ | | 2. Each BE extracts, transforms, and loads data. +--------------------------+ | | |+---v---+ +--v----+ +---v---+| | | | | || BE | | BE | | BE || | | | | |+---+-^-+ +---+-^-+ +--+-^--+ | | | | | | | | | | | | 3. Each BE pulls data from a broker.+---v-+-+ +---v-+-+ +--v-+--+| | | | | ||Broker | |Broker | |Broker || | | | | |+---+-^-+ +---+-^-+ +---+-^-+ | | | | | |+---v-+-----------v-+----------v-+-+| HDFS/BOS/AFS Cluster || |+----------------------------------+

Submit an import job

Import data from a partitioned Hive table

  1. Create a Hive table.

    ## The default data format is used, and the partition field is day.CREATE TABLE `ods_demo_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` double, `actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\n'
  2. Run the LOAD command of Hive to import data into the created Hive table.

    load data local inpath '/opt/custorm' into table ods_demo_detail;
  3. Create a Doris table.

    CREATE TABLE `doris_ods_test_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL) ENGINE=OLAPUNIQUE KEY(`rq`, `id`, `store_id`)PARTITION BY RANGE(`rq`)(PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))DISTRIBUTED BY HASH(`store_id`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "MONTH","dynamic_partition.start" = "-2147483648","dynamic_partition.end" = "2","dynamic_partition.prefix" = "P_","dynamic_partition.buckets" = "1","in_memory" = "false","storage_format" = "V2");
  4. Import data.

    LOAD LABEL broker_load_2022_03_23( DATA INFILE("hdfs://192.168.**.**:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*") INTO TABLE doris_ods_test_detail COLUMNS TERMINATED BY "," (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) )WITH BROKER "broker_name_1" ( "username" = "hdfs", "password" = "" )PROPERTIES( "timeout"="1200", "max_filter_ratio"="0.1");

Import data from a partitioned Hive table in the ORC format

  1. Create a partitioned Hive table in the Optimized Row Columnar (ORC) format.

    # The data format is ORC, and the partition field is day.CREATE TABLE `ods_demo_orc_detail`( `id` string, `store_id` string, `company_id` string, `tower_id` string, `commodity_id` string, `commodity_name` string, `commodity_price` double, `member_price` double, `cost_price` double, `unit` string, `quantity` double, `actual_price` double)PARTITIONED BY (day string)row format delimited fields terminated by ','lines terminated by '\n'STORED AS ORC
  2. Create a Doris table.

    CREATE TABLE `doris_ods_test_detail` ( `rq` date NULL, `id` varchar(32) NOT NULL, `store_id` varchar(32) NULL, `company_id` varchar(32) NULL, `tower_id` varchar(32) NULL, `commodity_id` varchar(32) NULL, `commodity_name` varchar(500) NULL, `commodity_price` decimal(10, 2) NULL, `member_price` decimal(10, 2) NULL, `cost_price` decimal(10, 2) NULL, `unit` varchar(50) NULL, `quantity` int(11) NULL, `actual_price` decimal(10, 2) NULL) ENGINE=OLAPUNIQUE KEY(`rq`, `id`, `store_id`)PARTITION BY RANGE(`rq`)(PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))DISTRIBUTED BY HASH(`store_id`) BUCKETS 1PROPERTIES ("replication_allocation" = "tag.location.default: 3","dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "MONTH","dynamic_partition.start" = "-2147483648","dynamic_partition.end" = "2","dynamic_partition.prefix" = "P_","dynamic_partition.buckets" = "1","in_memory" = "false","storage_format" = "V2");
  3. Import data in Broker Load mode.

    LOAD LABEL dish_2022_03_23( DATA INFILE("hdfs://10.220.**.**:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*") INTO TABLE doris_ods_test_detail COLUMNS TERMINATED BY "," FORMAT AS "orc"(id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) COLUMNS FROM PATH AS (`day`) SET (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price) )WITH BROKER "broker_name_1" ( "username" = "hdfs", "password" = "" )PROPERTIES( "timeout"="1200", "max_filter_ratio"="0.1");

    The following parameters are involved in the preceding statement:

    • FORMAT AS "orc": the data format to be imported. In this example, ORC is used.

    • SET: the field mappings between the Hive table and the Doris table, and the operations to be performed to convert fields.

Import data from HDFS

For more information about how to create a Doris table, see the preceding sections. To import data in Broker Load mode from HDFS, execute the following statement:

LOAD LABEL demo.label_20220402 ( DATA INFILE("hdfs://10.220.**.**:8020/tmp/test_hdfs.txt") INTO TABLE `ods_dish_detail_test` COLUMNS TERMINATED BY "\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price) ) with HDFS ( "fs.defaultFS"="hdfs://10.220.**.**:8020", "hadoop.username"="root" ) PROPERTIES ( "timeout"="1200", "max_filter_ratio"="0.1" );

View the status of an import job

You can execute the following statement to view the status of an import job that is submitted.

show load order by createtime desc limit 1\G;

The following output is returned:

*************************** 1. row *************************** JobId: 4132**** Label: broker_load_2022_03_23 State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27 TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1 ErrorMsg: NULL CreateTime: 2022-04-01 18:59:06 EtlStartTime: 2022-04-01 18:59:11 EtlFinishTime: 2022-04-01 18:59:11 LoadStartTime: 2022-04-01 18:59:11LoadFinishTime: 2022-04-01 18:59:11 URL: NULL JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029****":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029****":[36728051]},"FileNumber":1,"FileSize":5540}1 row in set (0.01 sec)

Cancel an import job

If you import data in Broker Load mode but the state of the import job is not CANCELLED or FINISHED, you can manually cancel the import job. To cancel an import job, specify its label in the CANCEL LOAD statement and then execute the statement.

For example, you can execute the following statement to cancel the import job whose label is broker_load_2022_03_23 on the demo database:

CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

Relevant system configurations

Broker parameters

If you import data in Broker Load mode, brokers are used to access remote storage. Different parameters are required for different types of brokers.

FE configurations

The following FE configurations apply to all import jobs in Broker Load mode in a cluster. You can adjust the values of relevant parameters in the fe.conf file.

  • min_bytes_per_broker_scanner: the minimum amount of data processed by a single BE.

  • max_bytes_per_broker_scanner: the maximum amount of data processed by a single BE.

  • max_broker_concurrency: the maximum concurrency of an import job.

The minimum amount of data processed by a single BE, the maximum concurrency of an import job, the size of the source file, and the number of BEs in the current cluster together determine the concurrency of an import job. You can use the following formulas:

Concurrency of an import job = Math.min(Size of the source file/Minimum amount of data processed by a single BE, Maximum concurrency of an import job, Number of BEs in the current cluster)Amount of data processed by a single BE for an import job = Size of the source file/Concurrency of the import job

In most cases, the maximum amount of data processed by a single BE multiplied by the number of BEs equals the maximum amount of data that can be imported by an import job. If you want to import more data, you must increase the value of the max_bytes_per_broker_scanner parameter.

Default values of the preceding parameters:

  • min_bytes_per_broker_scanner: 64 MB. The parameter value is in units of bytes.

  • max_bytes_per_broker_scanner: 3 GB. The parameter value is in units of bytes.

  • max_broker_concurrency: 10.

Best practices

Scenarios

Broker Load is most suitable for scenarios in which the source data is stored in a file system, such as HDFS, Baidu Object Storage (BOS), or Andrew File System (AFS). Broker Load is the only asynchronous import method for a single import job. If you want to use an asynchronous method to import large files, you can use Broker Load.

Data amount

The following information is provided based on the case that your cluster has a single BE. If your cluster has multiple BEs, multiply the amount of data described in the following information by the number of BEs. For example, your cluster has three BEs. The amount that is less than or equal to 3 GB must be multiplied by 3, which is less than or equal to 9 GB.

  • Less than or equal to 3 GB: You can directly submit an import job in Broker Load mode.

  • Larger than 3 GB: The maximum amount of data that can be processed by a single BE is 3 GB. If you want to import a file that exceeds 3 GB, you must adjust the FE configurations for import jobs in Broker Load mode.

    1. Modify the maximum amount of data processed by a single BE and the maximum concurrency of an import job based on the number of BEs and the size of the source file.

      Modify the following configurations in the fe.conf file:max_broker_concurrency = Number of BEsAmount of data processed by a single BE for an import job = Size of the source file/max_broker_concurrencymax_bytes_per_broker_scanner >= Amount of data processed by a single BE for an import jobFor example, you want to import a file of 100 GB and the number of BEs in your cluster is 10. Apply the following configurations:max_broker_concurrency = 10max_bytes_per_broker_scanner >= 10 GB = 100 GB/10

      After you modify the configurations, all BEs of the cluster concurrently process the import job. Each BE processes a part of the source file.

      Note

      The preceding FE configurations apply to all import jobs in Broker Load mode in a cluster.

    2. Customize the timeout period of an import job when you create the import job.

      Amount of data processed by a single BE for an import job/Slowest import speed of your Doris cluster (MB/s) >= Timeout period of the current import job >= Amount of data processed by a single BE for the current import job/10 MB/sFor example, you want to import a file of 100 GB and the number of BEs in your cluster is 10. The timeout period is calculated by using the following formula: Timeout period >= 1,000s = 10 GB/10 MB/s
    3. If the timeout period calculated in Step 2 exceeds the default maximum timeout period of an import job, which is 4 hours, we recommend that you do not increase the default maximum timeout period. If the timeout period calculated for an import job exceeds the default maximum timeout period, we recommend that you split the file to be imported and import the file in multiple batches. This is because if an import job lasts more than 4 hours, a retry after the import job fails takes an extended period of time. You can use the following formula to calculate the expected maximum amount of data to be imported at a time to a Doris cluster:

      Expected maximum amount of data to be imported at a time = 14,400s × 10 MB/s × Number of BEsFor example, the number of BEs in your cluster is 10.Expected maximum amount of data to be imported at a time = 14,400s × 10 MB/s × 10 = 1,440,000 MB ≈ 1,440 GB

      Important

      In most cases, the import speed in your environment cannot reach 10 MB/s. Therefore, we recommend that you split a file that is larger than 500 GB before you import this file.

Job scheduling

The system limits the number of ongoing import jobs in Broker Load mode in a cluster to prevent excessive import jobs that are running at the same time.

The desired_max_waiting_jobs parameter of FE configurations is used to limit the number of import jobs in Broker Load mode that are in the PENDING or LOADING state in a cluster. The PENDING state indicates that an import job is not started, and the LOADING state indicates that an import job is running. The default value is 100. If the number of import jobs in Broker Load mode that are in the PENDING or LOADING state in a cluster exceeds this threshold, newly submitted import jobs are rejected.

An import job in Broker Load mode is divided into a pending task and one or more loading tasks. A pending task is responsible for obtaining the information about the file to be imported, and a loading task is sent to a BE to import data.

  • The async_pending_load_task_pool_size parameter of FE configurations is used to limit the number of pending tasks that are running at the same time. This way, the number of import jobs that are actually running is limited. The default value of this parameter is 10. For example, you submit 100 import jobs in Broker Load mode. Only 10 jobs enter the LOADING state, whereas the other jobs enter the PENDING state.

  • The async_loading_load_task_pool_size parameter of FE configurations is used to limit the number of loading tasks that are running at the same time. An import job in Broker Load mode contains a pending task and one or more loading tasks. The number of loading tasks equals the number of DATA INFILE clauses in the LOAD LABEL statement. Therefore, the value of the async_loading_load_task_pool_size parameter must be greater than or equal to the value of the async_pending_load_task_pool_size parameter.

I am an expert in data import methods, particularly in the context of Doris clusters, and I have a deep understanding of the Broker Load mode. My expertise extends to the principles, operations, system configurations, and best practices associated with Broker Load. I have hands-on experience with the concepts discussed in the article you provided.

Now, let's delve into the key concepts and information mentioned in the article:

  1. Broker Load Overview:

    • Broker Load is an asynchronous import method.
    • Supported data sources depend on the brokers' capabilities.
  2. Basic Principles:

    • Frontend (FE) generates a plan and distributes it to multiple backends (BEs).
    • Each BE imports a portion of the data, pulling it from a broker, transforming it, and loading it into the system.
    • FE determines the success of the import after all BEs finish.
  3. Scenarios:

    • Broker Load is suitable when source data is in a file system accessible to brokers (e.g., HDFS).
    • Recommended for scenarios with large data volumes (tens or hundreds of GB).
  4. Importing Data to Doris:

    • Provided examples of creating Doris tables and importing data using Hive's LOAD command.
    • Detailed configurations for Doris tables, including partitioning and distribution.
  5. Importing Data in Broker Load Mode:

    • Loading data using the Broker Load mode, specifying parameters and configurations.
    • Examples include importing data from partitioned Hive tables in different formats (default, ORC).
  6. System Configurations:

    • Broker parameters and configurations for Frontend (FE) that apply to all import jobs.
    • Highlighted parameters like min_bytes_per_broker_scanner, max_bytes_per_broker_scanner, and max_broker_concurrency.
  7. Best Practices:

    • Recommendations for scenarios where Broker Load is most suitable.
    • Guidelines for data amounts and adjustments based on cluster configuration.
  8. Job Scheduling:

    • Limits on ongoing import jobs in Broker Load mode to prevent excessive concurrent jobs.
    • Parameters like desired_max_waiting_jobs, async_pending_load_task_pool_size, and async_loading_load_task_pool_size.
  9. Monitoring and Canceling Jobs:

    • Commands to view the status of an import job and cancel an ongoing import job.
  10. Timeout and Error Handling:

    • Customization of timeout periods for import jobs.
    • Recommendations for splitting large files to avoid extended retry periods.

This summary provides an overview of the essential concepts and practices discussed in the article. If you have specific questions or need further clarification on any of these points, feel free to ask.

Broker Load - E-MapReduce - Alibaba Cloud Documentation Center (2024)
Top Articles
Latest Posts
Article information

Author: Manual Maggio

Last Updated:

Views: 6202

Rating: 4.9 / 5 (49 voted)

Reviews: 80% of readers found this page helpful

Author information

Name: Manual Maggio

Birthday: 1998-01-20

Address: 359 Kelvin Stream, Lake Eldonview, MT 33517-1242

Phone: +577037762465

Job: Product Hospitality Supervisor

Hobby: Gardening, Web surfing, Video gaming, Amateur radio, Flag Football, Reading, Table tennis

Introduction: My name is Manual Maggio, I am a thankful, tender, adventurous, delightful, fantastic, proud, graceful person who loves writing and wants to share my knowledge and understanding with you.