CREATE OR REPLACE PROCEDURE public.incremental_sync_billing_document_header() LANGUAGE plpgsql AS $$ DECLARE sql VARCHAR(MAX) := ''; staged_record_count BIGINT :=0; BEGIN -- Truncate staging table EXECUTE 'TRUNCATE TABLE stg.lis_13_vdhdr;'; EXECUTE 'COPY JOB RUN auto_copy_lis_13_vdhdr'; sql := 'SELECT COUNT(*) FROM stg.lis_13_vdhdr;'; EXECUTE sql INTO staged_record_count; IF staged_record_count > 0 THEN -- Create staging table to store records from DM tables that match the current batch in the staging table EXECUTE 'drop table if exists temp_billing_document_header_match;'; EXECUTE 'create temp table temp_billing_document_header_match(like dm.billing_document_header); '; --Identify sls_dstrbn_document_nbr records from DM table that exist in the latest extract -- We will expire these records in a later step EXECUTE 'insert into temp_billing_document_header_match ( cncl_data_recd_ind ,sls_dstrbn_document_nbr ,company_code ,sales_district ,record_create_date ,Billing_Type ,billing_date ,Billing_Category ,local_currency ,customer_group ,Sold_to_Party ,Payer ,Financial_Accounting_ExchangeRate ,exchange_rate_type ,sales_employee ,statistics_currency ,sd_document_category ,sales_organization ,distribution_channel ,sd_document_currency ,Num_of_billing_docs ,fiscal_year_variant ,odq_change_mode ,odq_counter ,dm_record_start_date ,dm_record_end_date ,dm_is_current ) select distinct cncl_data_recd_ind ,sls_dstrbn_document_nbr ,company_code ,sales_district ,record_create_date ,Billing_Type ,billing_date ,Billing_Category ,local_currency ,customer_group ,Sold_to_Party ,Payer ,Financial_Accounting_ExchangeRate ,exchange_rate_type ,sales_employee ,statistics_currency ,sd_document_category ,sales_organization ,distribution_channel ,sd_document_currency ,Num_of_billing_docs ,fiscal_year_variant ,odq_change_mode ,odq_counter ,dm_record_start_date ,dm_record_end_date ,dm_is_current from dm.billing_document_header bdh join stg.lis_13_vdhdr vdhdr on bdh.sls_dstrbn_document_nbr = vdhdr.vbeln where dm_is_current = ''1'' ;'; sql := 'SELECT COUNT(*) FROM temp_billing_document_header_match;'; EXECUTE sql INTO staged_record_count; RAISE INFO 'Matched records into temp_billing_document_header_match: %', staged_record_count; -- Delete records from target table that also exist in staging table (updated records) EXECUTE 'DELETE FROM dm.billing_document_header using temp_billing_document_header_match tbdh WHERE dm.billing_document_header.sls_dstrbn_document_nbr = tbdh.sls_dstrbn_document_nbr and dm.billing_document_header.dm_is_current =''1'';'; -- Insert all records from staging table into target table EXECUTE 'insert into dm.billing_document_header ( cncl_data_recd_ind ,sls_dstrbn_document_nbr ,company_code ,sales_district ,record_create_date ,Billing_Type ,billing_date ,Billing_Category ,local_currency ,customer_group ,Sold_to_Party ,Payer ,Financial_Accounting_ExchangeRate ,exchange_rate_type ,sales_employee ,statistics_currency ,sd_document_category ,sales_organization ,distribution_channel ,sd_document_currency ,Num_of_billing_docs ,fiscal_year_variant ,odq_change_mode ,odq_counter ,dm_record_start_date ,dm_record_end_date ,dm_is_current ) with lis_13_vdhdr_latest as (select v.*,row_number() over (partition by vbeln order by ERDAT DESC,rocancel ) as doc_item_rank from stg.lis_13_vdhdr v) SELECT distinct rocancel, vbeln, bukrs, bzirk, To_date(erdat,''YYYYMMDD'',true), fkart, To_date(fkdat,''YYYYMMDD'',true), fktyp, hwaer, kdgrp, kunag, kunrg, kurrf, kurst, pvrtnr, stwae, vbtyp, vkorg, vtweg, waerk, anzfk, periv, odq_changemode, odq_entitycntr, CURRENT_TIMESTAMP, CASE WHEN rocancel =''X'' OR doc_item_rank > 1 THEN current_timestamp else ''9999-12-31''::DATE END, CASE WHEN rocancel =''X'' OR doc_item_rank > 1 THEN ''0'' ELSE ''1'' END FROM lis_13_vdhdr_latest vdhdr ;'; -- Insert Old records with expiry dates EXECUTE 'insert into dm.billing_document_header ( cncl_data_recd_ind ,sls_dstrbn_document_nbr ,company_code ,sales_district ,record_create_date ,Billing_Type ,billing_date ,Billing_Category ,local_currency ,customer_group ,Sold_to_Party ,Payer ,Financial_Accounting_ExchangeRate ,exchange_rate_type ,sales_employee ,statistics_currency ,sd_document_category ,sales_organization ,distribution_channel ,sd_document_currency ,Num_of_billing_docs ,fiscal_year_variant ,odq_change_mode ,odq_counter ,dm_record_start_date ,dm_record_end_date ,dm_is_current ) SELECT distinct cncl_data_recd_ind ,sls_dstrbn_document_nbr ,company_code ,sales_district ,record_create_date ,Billing_Type ,billing_date ,Billing_Category ,local_currency ,customer_group ,Sold_to_Party ,Payer ,Financial_Accounting_ExchangeRate ,exchange_rate_type ,sales_employee ,statistics_currency ,sd_document_category ,sales_organization ,distribution_channel ,sd_document_currency ,Num_of_billing_docs ,fiscal_year_variant ,odq_change_mode ,odq_counter ,dm_record_start_date ,current_timestamp ,0 FROM temp_billing_document_header_match tsdh ;'; -- Refresh dm.billing_document_header_latest MV. This MV will be used to list the latest version of the billing_document_headers EXECUTE 'refresh materialized view archdm.billing_document_header_latest;'; ELSE RAISE INFO 'No data found in staging table.'; END IF; END $$