Appearance
Error Tolerance
By default, Spark does not tolerate any errors for distributed Spark executors (tasks). Therefore, any error encountered by an executor will cause the entire Spark job to fail, rolling back the data that was exported to the point of failure.
You can use the --task-failures
option to modify this behavior; however, failed and restarted tasks are likely to cause data duplication during the load process. You can manage data duplication for failed tasks in the following ways.
If an application key is present in each row of data (such as a primary_key_id
), you can deduplicate the data by running a simple DELETE
command:
delete
from @{config.ybTableName} t
where t.load_transaction_id = '@{transactionId}'
and rowid not in (
select max(rowid)
from @{config.ybTableName}
where load_transaction_id = '@{transactionId}'
group by primary_key_id
);
If a primary key is not available, you can use another technique, which requires three steps:
- Add 3 new columns to the target table:
partition_id
,attempt_number
, andload_transaction_id
. Define these three columns asint
,int
, andvarchar(64)
:
ALTER TABLE ${table} ADD COLUMN partition_id INT;
ALTER TABLE ${table} ADD COLUMN attempt_number INT;
ALTER TABLE ${table} ADD COLUMN load_transaction_id VARCHAR(64);
- Provide a
computed-columns.properties
file and reference it using the--computed-columns
argument. This file should contain entries like these:
partition_id=taskContext.partitionId()
attempt_number=taskContext.attemptNumber()
load_transaction_id=transactionId
Each row of data will be populated with the load transaction ID, the partition ID, and the attempt number. If an executor/task fails and is restarted by the Spark container, the task will run again with a higher attempt number.
- Run a
DELETE
statement to remove duplicates. For example:
delete
from @{config.ybTableName} t
using (
select partition_id, max(attempt_number) as final_attempt
from @{config.ybTableName}
where load_transaction_id = '@{transactionId}'
group by 1
) final_attempts
where t.load_transaction_id = '@{transactionId}'
and t.partition_id = final_attempts.partition_id
and t.attempt_number != final_attempts.final_attempt;
Note: You can specify this exact SQL statement as input to the --post-sql
option.
Parent topic:Setting up and Running a Spark Job