Формирование потоков
Создадим очередь для передачи событий в БД-источнике и очередь для применения событий в БД-получателе, например:
EXECUTE DBMS_STREAMS_ADM.SET_UP_QUEUE ( )
CONNECT streamadmin/streamadmin@destination
EXECUTE DBMS_STREAMS_ADM.SET_UP_QUEUE ( )
Коли указано специально, очереди в обеих БД (и таблицы для данных этих очередей) получили умолчательные названия. Их можно наблюдать так:
SQL> CONNECT streamadmin/streamadmin@source
Connected.
SQL> SELECT name, queue_table FROM user_queues;
NAME QUEUE_TABLE ------------------------------ ------------------------------ STREAMS_QUEUE STREAMS_QUEUE_TABLE
AQ$_STREAMS_QUEUE_TABLE_E STREAMS_QUEUE_TABLE
Очередь AQ$_*_E создается автоматически для сообщений об ошибках обработки событий.
Для возможности передавать потоком изменения в исходной таблице SCOTT.EMP требуется заявить расширенную журнализацию хотя бы для этой таблицы:
CONNECT scott/tiger@source
ALTER TABLE emp ADD SUPPLEMENTAL LOG DATA ( PRIMARY KEY ) COLUMNS;
Проверка:
SQL> SELECT always, table_name, log_group_type FROM user_log_groups;
ALWAYS TABLE_NAME LOG_GROUP_TYPE ----------- ------------------------------ ------------------- ALWAYS EMP PRIMARY KEY LOGGING
Теперь правка любого поля в таблице EMP будет сопровождаться (безусловно) занесением в журнал не только старого и нового значений этого поля, но также и значения ключевого поля (то есть EMPNO).
В БД-источнике создадим процесс захвата изменений, одновременно указав правила отбора изменений в очередь:
CONNECT streamadmin/streamadmin@source
BEGIN DBMS_STREAMS_ADM.ADD_TABLE_RULES ( table_name => 'scott.emp' , streams_type => 'capture' , streams_name => 'capture_stream' , include_ddl => TRUE ); END; /
Проверка:
SQL> SELECT capture_name, queue_name, queue_owner, status 2 FROM all_capture;
CAPTURE_NAME QUEUE_NAME QUEUE_OWNER STATUS ------------------ ------------------ ------------------ -------- CAPTURE_STREAM STREAMS_QUEUE STREAMADMIN DISABLED
Среди прочих умолчаний при создании процесса захвата изменений выше использовано подразумеваемое молчаливо имя очереди STREAMS_QUEUE. В нашем случае это можно было бы обозначить явно, указав параметр QUEUE_NAME => 'streamadmin.streams_queue'. Этим же параметром можно воспользоваться, когда процесс захвата потребуется связать с очередью под иным именем.
Правила отбора изменений в очередь STREAMS_QUEUE также были построены автоматически, но могли бы быть дополнены, или даже выписаны явно с помощью других параметров процедуры ADD_TABLE_RULES.
Создадим процесс переноса изменений:
BEGIN DBMS_STREAMS_ADM.
ADD_TABLE_PROPAGATION_RULES ( table_name => 'scott.emp' , streams_name => '
maindb_to_subdb1' , source_queue_name => 'streamadmin.
streams_queue' , destination_queue_name => 'streamadmin.
streams_queue@
subdb1.class' , source_database => '
maindb.class' , include_ddl => TRUE ); END; /
Проверка:
SQL> SELECT propagation_name, source_queue_name, 2 destination_queue_name, status 3 FROM
dba_propagation;
PROPAGATION_NAME SOURCE_QUEUE_NAME DESTINATION_QUEUE_NAM STATUS ---------------- ----------------- --------------------- -------
MAINDB_TO_SUBDB1 STREAMS_QUEUE STREAMS_QUEUE
ENABLED
Теперь для правильного воспроизведения изменений в принимающей БД требуется передать ей в качестве "точки отсчета" номер изменений в БД-источнике. Передаваться получателям будут только изменения в EMP с номерами более поздними:
BEGIN DBMS_APPLY_ADM.
SET_TABLE_INSTANTIATION_SCN@
subdb1.class ( source_object_name => 'scott.emp' , source_database_name => 'maindb.class' , instantiation_scn => DBMS_FLASHBACK.
GET_SYSTEM_CHANGE_NUMBER
); END; /
Убедиться в учете процессом применения для таблиц точки отсчета можно запросом:
SQL> COLUMN source_database FORMAT A20 SQL> SELECT 2 source_object_name, source_object_type, instantiation_scn 3 FROM
dba_apply_instantiated_objects@subdb1.class;
SOURCE_OBJECT_NAME SOURCE_OBJE INSTANTIATION_SCN ------------------------------ ----------- -----------------
EMP TABLE
1200698
Принимающая БД готова к активации процесса применения изменений:
CONNECT streamadmin/streamadmin@
destination
BEGIN DBMS_STREAMS_ADM.
ADD_TABLE_RULES ( table_name => 'scott.emp' , streams_type => '
apply' , streams_name => '
apply_stream' , source_database => 'maindb.class' , include_ddl => TRUE ); END; /
Проверка:
SQL> SELECT apply_name, queue_name, status FROM all_apply;
APPLY_NAME QUEUE_NAME STATUS ------------------------ ------------------------ --------
APPLY_STREAM STREAMS_QUEUE
DISABLED
Для удобства отключим реакцию на ошибки, иначе процесс применения изменений может самопроизвольно прекращаться:
BEGIN DBMS_APPLY_ADM.SET_PARAMETER ( apply_name => '
apply_stream' , parameter => '
disable_on_error' , value => '
N' ); END; /
Осталось запустить процессы захвата и примения изменений:
CONNECT streamadmin/streamadmin@
source
EXECUTE DBMS_CAPTURE_ADM.
START_CAPTURE ( 'capture_stream' )
EXECUTE - DBMS_APPLY_ADM.
START_APPLY@
subdb1.class ( 'apply_stream' )
Проверка:
SQL>
CONNECT streamadmin/streamadmin@
source
Connected. SQL> SELECT empno FROM scott.emp MINUS 2 SELECT empno FROM scott.emp@
subdb1.class
3 . SQL> SAVE delta REPLACE Wrote file delta.sql SQL> @delta
no rows selected
SQL> INSERT INTO scott.emp ( empno ) VALUES ( 3333 );
1 row created.
SQL> @delta
EMPNO ----------
3333
SQL> COMMIT;
Commit complete.
SQL> @delta
no rows selected
Заметьте, что поток переносит изменения только в одну сторону. Таблица-приемник при этом не закрыта от обычной правки. Однако же такую правку следует выполнять осмотрительно, поскольку она может привести к ошибкам при автоматическом изменении данных потоком (эта проблема решается специально седствами разрешении конфликтов). Вдобавок учтите, что множественные операции INSERT, UPDATE, DELETE применяются в принимающей БД в рамках одной (автономной) транзакции (невзирая на то, что в журнале БД множественные изменения фиксируются набором однострочных изменений). Следовательно ошибка хотя бы в изменении одной-единственной строки приведет к отказу изменений всей множественной операции.
Упражнение. Внести изменения в таблицу SCOTT.EMP на принимающей БД. Убедиться в сохраняющихся расхождениях в таблицах БД-источника и БД-получателя.
Упражнение. Проверить передачу изменений DDL. Добавить столбец в таблицу SCOTT.EMP@MAINDB.CLASS. Наблюдать результат в SCOTT.EMP@SUBDB1.CLASS. Изменить тип столбца, наблюдать результат в базе-получателе.
1 До версии 10 использовалось название Advanced Queuing (AQ).
1 Начиная с версии 10.2.
Содержание раздела