Thursday 9 June 2011

Capture Changed Data (in Teradata)

Preserving history in a fact table is a common process in ETL.
If one ore more columns have changed you can add the changed one.

What if you need to know which field/column has changed?

You can write a construction that compares every field.
However, execution of such a query is performance killer.
Besides that, if the tables has, say, 50 columns it's a hell
of a job to write that.

Here you can find a small subset of an invoice fact table
(in reality there are a lot more fields). Amount, status and country
have changed.



This could be the result of the analysis script:



Here you can find the script that compares every column
per non unique index (e.g. invoicenumber).
Assumption is that the primary index is NUPI. If you designed it
otherwise, you have to change the script (hard-coding).

CREATE MULTISET TABLE DEVK425WGE.DataChangeCapture ,NO FALLBACK ,
NO BEFORE JOURNAL,
NO AFTER JOURNAL,
CHECKSUM = DEFAULT
(
databasename VARCHAR(50),
tablename VARCHAR(100),
obj_id DECIMAL(15,0),
field VARCHAR(100) CHARACTER SET LATIN NOT CASESPECIFIC,
item_old VARCHAR(100) CHARACTER SET LATIN NOT CASESPECIFIC,
item_new VARCHAR(100) CHARACTER SET LATIN NOT CASESPECIFIC,
date_original TIMESTAMP(6),
date_change TIMESTAMP(6))
PRIMARY INDEX ( databasename, tablename,field,obj_id );

--


create procedure pDataChangeCapture
(
IN OBJECTID CHAR(50)
, IN DATABASENAME VARCHAR(50)
, IN TABLENAME VARCHAR(100)
, IN FIELD VARCHAR(100)
)

BEGIN



CALL DBC.SysExecSQL('

insert into DataChangeCapture
(
databasename
, tablename
, obj_id
, field
, item_old
, item_new
, date_original
, date_change
)

sel
'||'27'xc||:DATABASENAME||'27'xc||'
, '||'27'xc||:TABLENAME||'27'xc||'
, a.'||:OBJECTID||' as obj_id
, '||'27'xc||:FIELD||'27'xc||' as field
, a.'||:FIELD||' as oldvalue
, b.'||:FIELD||' as newvalue
, b.preceding_date as date_original
, b.UPDATE_DT as date_change
from '||DATABASENAME||'.'||TABLENAME||' a,
(sel
'||:OBJECTID||' as objectid
, '||:FIELD||'
, UPDATE_DT' as UPDATE_DT
, max('UPDATE_DT') over (partition by '||:OBJECTID||' order by '||:OBJECTID||' , UPDATE_DT rows between 1
preceding and 1 preceding) as
preceding_date
from '||DATABASENAME||'.'||TABLENAME||'
) b
where
a.'||:OBJECTID||' = b.objectid
and a.UPDATE_DT = b.preceding_date
and (a.'||:FIELD||' <> b.'||:FIELD||'
or (a.'||:FIELD||' is NULL
and b.'||:FIELD||' is not null)
or (b.'||:FIELD||' is NULL
and a.'||:FIELD||' is not null) )'
)
;



END;

--

create procedure pDataChangeCaptureStart (databasename varchar(50), tablename varchar(100))

begin

declare objectid varchar(50);

sel columnname into :objectid from dbc.indices
where databasename = :databasename
and tablename = :tablename
and indextype = 'P';

create volatile table tDataChangeCapture (rownum int, FIELDNAME varchar(100)) ON COMMIT PRESERVE ROWS;

ins into tDataChangeCapture
sel csum(1,1), columnname from dbc.columns
where databasename = :databasename
and tablename = :tablename
and columnname <> :objectid
and columnname <> 'UPDATE_DT';

delete DataChangeCapture;

looper:begin

declare maxfields int;
declare x int default 0;
declare field varchar(100);

sel count(*) from tDataChangeCapture into :maxfields;

loopit: while x<=maxfields

do

set x = x+1;

sel fieldname from tDataChangeCapture into :field where rownum = :x;

call pDataChangeCapture(objectid, databasename, tablename, field);

end while loopit;

end looper;

drop table tDataChangeCapture;

end;

Kick of the procedure by command:

call pDataChangeCaptureStart ('Databasename','Tablename')