频道栏目
首页 > 资讯 > 其他综合 > 正文

oracle到kafka的同步

16-07-27        来源:[db:作者]  
收藏   我要投稿

GoldenGate 12.2 新版支持同步如下图




ogg--------------hdfs ogg--------------kafka
ogg--------------flumn
安装kafka
安装zookeeper 配置环境变量,/etc/profile添加以下内容: [root@T2 kafkaogg]# export ZOOKEEPER_HOME=/opt/app/kafka/zookeeper-3.4.6 [root@T2 kafkaogg]# export PATH=$PATH:$ZOOKEEPER_HOME/bin 修改配置文件 [root@T2 kafkaogg]# cd zookeeper-3.4.6/conf/ [root@T2 conf]# ls configuration.xsl log4j.properties zoo_sample.cfg [root@T2 conf]# cat zoo_sample.cfg | grep -v '#' > zoo.cfg [root@T2 conf]# vi zoo.cfg
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/tmp/zookeeper clientPort=2181 接着启动zookeeper服务 [root@T2 conf]# ../bin/zkServer.sh start JMX enabled by default Using config: /opt/app/kafkaogg/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 查看是否启动 [root@T2 conf]# ../bin/zkServer.sh status JMX enabled by default Using config: /opt/app/kafkaogg/zookeeper-3.4.6/bin/../conf/zoo.cfg Mode: standalone Server启动之后, 就可以启动client连接server了, 执行脚本: [root@T2 bin]#./zkCli.sh -server localhost:2181
安装kafka 安装kafka server之前需要单独安装zookeeper server,而且需要修改config/server.properties里面的IP信息 [root@T2 config]# pwd /opt/app/kafkaogg/kafka_2.11-0.9.0.0/config [root@T2 config]# vi server.properties zookeeper.connect=localhost:2181 这里需要修改默认zookeeper.properties配置 [root@T2 config]# vi zookeeper.properties dataDir=/tmp/zkdata 先启动zookeeper,启动前先kill掉之前的zkServer.sh启动的zookeeper服务 [root@T2 bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties 启动kafka服务 [root@T2 bin]# ./kafka-server-start.sh ../config/server.properties 查看kafka进程是否启动: [root@T2 bin]# jps 16882 QuorumPeerMain 17094 Kafka 17287 Jps 创建topic [root@T2 bin]#./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". 查看topic列表 [root@T2 bin]#./kafka-topics.sh --list --zookeeper localhost:2181 test 创建broker集群
[root@T2 config]#cp server.properties ./server-1.properties [root@T2 config]#cp server.properties ./server-2.properties
更改以下内容 [root@T2 config]#vi server-1.properties broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 [root@T2 config]#vi server-2.properties broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2 其中broker.id是每一个broker的唯一标识号启动另外两个Broker进程: [root@T2 config]#../bin/kafka-server-start.sh ./server-1.properties & [root@T2 config]#../bin/kafka-server-start.sh ./server-2.properties &
在同一台机器上创建三个broker的topic: [root@T2 bin]#./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic fafacluster Created topic "fafacluster". 查看三个broker中,leader和replica角色: [root@T2 bin]#./kafka-topics.sh --describe --zookeeper localhost:2181 --topic fafacluster Topic:fafacluster PartitionCount:1 ReplicationFactor:3 Configs: Topic: fafacluster Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 对于三个broker的伪cluster,可以尝试杀掉其中一个或两个broker进程,然后看看leader,replica,isr会不会发生变化,消费者还能不能正常消费消息。生产者推送消息: [root@T2 bin]#./kafka-console-producer.sh --broker-list localhost:9092 --topic fafacluster fafa fafa01 杀掉一个broker进程 [root@T2 bin]#kill -9 22497 [root@T2 bin]#./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafacluster .................................. .................................. java.nio.channels.ClosedChannelException at kafka.network.BlockingChannel.send(BlockingChannel.scala:110) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:98) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:188) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:84) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:187) at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:182) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:182) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:88) at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:78) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777) at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:78) at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) fafa fafa01 可以看到一大堆报错之后,还是完整的消费到生产的消息使用kafka连接工具导入导出信息 [root@T2 kafka_2.11-0.9.0.0]# cat test.txt foo bar dadadada fgdgeg sfewfwef sfd [root@T2 kafka_2.11-0.9.0.0]# cat config/connect-standalone.properties bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 [root@T2 kafka_2.11-0.9.0.0]# cat config/connect-file-source.properties name=local-file-source connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector tasks.max=1 file=test.txt topic=fafacluster [root@T2 kafka_2.11-0.9.0.0]# cat config/connect-file-sink.properties name=local-file-sink connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector tasks.max=1 file=test.sink.txt topics=fafacluster [root@T2 kafka_2.11-0.9.0.0]# bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties 此时在接受端收到消息 [root@T2 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafacluster {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"} {"schema":{"type":"string","optional":false},"payload":"dadadada"} {"schema":{"type":"string","optional":false},"payload":"fgdgeg"} {"schema":{"type":"string","optional":false},"payload":"sfewfwef"} {"schema":{"type":"string","optional":false},"payload":"sfd"} 原理解析: 1,kafka连接进程启动以后,源端连接开始读test.txt数据 2,把消息推送给topic fafacluster 3,sink connector连接开始读topic为fafacluster的消息,并把它写入文件名为test.sink.txt的文件继续插入文件 [root@T2 kafka_2.11-0.9.0.0]# echo "Another line" >> test.txt 接收端查看发现,新增加的信息到达接收端 {"schema":{"type":"string","optional":false},"payload":"Another line"}
测试结束,现在我们创建一个同步数据topic,这里先确保zookeeper和kafka进程在运行 [root@T2 bin]#./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fafatable
[root@T2 bin]# ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fafatable SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/app/Kafka/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/app/kafkaogg/kafka_2.11-0.9.0.0/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Created topic "fafatable". 注意这里是kafka扫描到其他路径有SLF4J而报错 [root@T2 app]# mv Kafka Kafka.bak 这样既可解决现在正式开始ogg oracle到kafka的同步一.源端Oracle数据库安装ogg client (过程略)不过需要在源端生产DEF文件二.目标端Kafka 安装ogg for bigdata client 只有ogg for bigdata 12.2版本才开始支持kafka。 ogg for bigdata client所需的jar包在libs文件夹下都有的
[root@T2 kafka_2.11-0.9.0.0]# cd libs/ [root@T2 libs]# ls aopalliance-repackaged-2.4.0-b31.jar javax.annotation-api-1.2.jar jetty-servlet-9.2.12.v20150709.jar log4j-1.2.17.jar argparse4j-0.5.0.jar javax.inject-1.jar jetty-util-9.2.12.v20150709.jar lz4-1.2.0.jar connect-api-0.9.0.0.jar javax.inject-2.4.0-b31.jar jopt-simple-3.2.jar metrics-core-2.2.0.jar connect-file-0.9.0.0.jar javax.servlet-api-3.1.0.jar kafka_2.11-0.9.0.0.jar osgi-resource-locator-1.0.1.jar connect-json-0.9.0.0.jar javax.ws.rs-api-2.0.1.jar kafka_2.11-0.9.0.0.jar.asc scala-library-2.11.7.jar connect-runtime-0.9.0.0.jar jersey-client-2.22.1.jar kafka_2.11-0.9.0.0-javadoc.jar scala-parser-combinators_2.11-1.0.4.jar hk2-api-2.4.0-b31.jar jersey-common-2.22.1.jar kafka_2.11-0.9.0.0-javadoc.jar.asc scala-xml_2.11-1.0.4.jar hk2-locator-2.4.0-b31.jar jersey-container-servlet-2.22.1.jar kafka_2.11-0.9.0.0-scaladoc.jar slf4j-api-1.7.6.jar hk2-utils-2.4.0-b31.jar jersey-container-servlet-core-2.22.1.jar kafka_2.11-0.9.0.0-scaladoc.jar.asc slf4j-log4j12-1.7.6.jar jackson-annotations-2.5.0.jar jersey-guava-2.22.1.jar kafka_2.11-0.9.0.0-sources.jar snappy-java-1.1.1.7.jar jackson-core-2.5.4.jar jersey-media-jaxb-2.22.1.jar kafka_2.11-0.9.0.0-sources.jar.asc validation-api-1.1.0.Final.jar jackson-databind-2.5.4.jar jersey-server-2.22.1.jar kafka_2.11-0.9.0.0-test.jar zkclient-0.7.jar jackson-jaxrs-base-2.5.4.jar jetty-http-9.2.12.v20150709.jar kafka_2.11-0.9.0.0-test.jar.asc zookeeper-3.4.6.jar jackson-jaxrs-json-provider-2.5.4.jar jetty-io-9.2.12.v20150709.jar kafka-clients-0.9.0.0.jar jackson-module-jaxb-annotations-2.5.4.jar jetty-security-9.2.12.v20150709.jar kafka-log4j-appender-0.9.0.0.jar javassist-3.18.1-GA.jar jetty-server-9.2.12.v20150709.jar kafka-tools-0.9.0.0.jar 前期配置工作在目标端kafka的ogg配置,这里安装OGG_v12.2.0.1_bigdata_Linux_x64.zip [root@T2 kafkaogg]# ls AdapterExamples dircrd dirtmp ggMessage.dat lib libggperf.so libxml2.txt prvtclkm.plb usrdecs.h bcpfmt.tpl dirdat dirwlt ggparam.dat libantlr3c.so libggrepo.so licenses replicat zlib.txt bcrypt.txt dirdef dirwww ggs_Adapters_Linux_x64.tar libdb-6.1.so libicudata.so.48 logdump retrace cachefiledump dirdmp emsclnt ggsci libggjava.so libicudata.so.48.1 mgr reverse checkprm dirout extract ggserr.log libggjava_ue.so libicui18n.so.48 notices.txt server convchk dirpcs freeBSD.txt help.txt libggjava_vam.so libicui18n.so.48.1 OGG_BigData_12.2.0.1.0_Release_Notes.pdf sqlldr.tpl convprm dirprm gendef kafka_2.11-0.9.0.0 libgglog.so libicuuc.so.48 OGG_BigData_12.2.0.1_README.txt tcperrs db2cntl.tpl dirrpt ggcmd kafka_2.11-0.9.0.0.tgz libggnnzitp.so libicuuc.so.48.1 oggerr ucharset.h dirchk dirsql ggjava keygen libggparam.so libxerces-c.so.28 OGG_v12.2.0.1_bigdata_Linux_x64.zip UserExitExamples 这里使用的是oracle用户 cd $OGG_HOME(在.bash_profile中添加$OGG_HOME环境变量)即/opt/app/kafkaogg [root@T2 kafkaogg]# ./ggsci
Oracle GoldenGate Command Interpreter Version 12.2.0.1.0 OGGCORE_12.2.0.1.0_PLATFORMS_151101.1925.2 Linux, x64, 64bit (optimized), Generic on Nov 10 2015 16:18:12 Operating system character set identified as UTF-8.
Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.


GGSCI (T2) 1> CREATE SUBDIRS
Creating subdirectories under current directory /opt/app/kafkaogg
Parameter files /opt/app/kafkaogg/dirprm: created Report files /opt/app/kafkaogg/dirrpt: created Checkpoint files /opt/app/kafkaogg/dirchk: created Process status files /opt/app/kafkaogg/dirpcs: created SQL script files /opt/app/kafkaogg/dirsql: created Database definitions files /opt/app/kafkaogg/dirdef: created Extract data files /opt/app/kafkaogg/dirdat: created Temporary files /opt/app/kafkaogg/dirtmp: created Credential store files /opt/app/kafkaogg/dircrd: created Masterkey wallet files /opt/app/kafkaogg/dirwlt: created Dump files /opt/app/kafkaogg/dirdmp: created GGSCI (T2) 7> edit params mgr port 1357 GGSCI (T2) 9> start mgr Manager started. 拷贝kafka adapter 配置文件: [root@T2 kafka]# pwd /opt/app/kafkaogg/AdapterExamples/big-data/kafka [root@T2 kafka]# cp * /opt/app/kafkaogg/dirprm/ [root@T2 kafka]# vi /opt/app/kafkaogg/dirprm/custom_kafka_producer.properties
#bootstrap.servers=host:port bootstrap.servers=172.16.57.55:9092 acks=1 #compression.type=gzip reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000
解释:
这里测试发现compression.type=gzip,replicate识别不了报错。尚不清楚原因 bootstrap是HOSTNAME [root@T2 kafka]# vi /opt/app/kafkaogg/dirprm/kafka.props

gg.handlerlist = kafkahandler gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.TopicName =fafatable gg.handler.kafkahandler.format =avro_op gg.handler.kafkahandler.SchemaTopicName=fafaschema gg.handler.kafkahandler.BlockingSend =true gg.handler.kafkahandler.includeTokens=false #gg.handler.kafkahandler.topicPartitioning=table gg.handler.kafkahandler.mode =op #gg.handler.kafkahandler.maxGroupSize =100, 1Mb #gg.handler.kafkahandler.minGroupSize =50, 500Kb

goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE
gg.log=log4j gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/opt/app/kafkaogg/kafka_2.11-0.9.0.0/libs/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
解释: gg.handler.kafkahandler.TopicName 对应kafka的topic,跟源端数据库的table相对应,但可以不同名,用map可以匹配。 gg.handler.kafkahandler.SchemaTopicName 当format为avro时,需要配置此参数,否则可以不配。 gg.handler.kafkahandler.BlockingSend 当值为true时,表示同步更新,下一个消息发送需要等到写入到目标topic中,且确认已经收到才发下一条消息。为false为异步更新,将一次性发给目标topic。 gg.handler.kafkahandler.topicPartitioning有两种参数值none | table,控制是否已发布到kafka的数据应按表分区。设置为表,不同表的数据被写入到不同的kafka主题。设置为None,来自不同表的数据交织在同一话题。 gg.handler.kafkahandler.mode 当值为tx时,表示源端一次事务内的操作在kafka上作为一个record。查看当前Java版本 [oracle@n3 kafka]$ java -version java version "1.8.0_77" Java(TM) SE Runtime Environment (build 1.8.0_77-b03) Java HotSpot(TM) 64-Bit Server VM (build 25.77-b03, mixed mode) 注意:需要是jdk-1.7版本,不然启动时会报如下超时错误,ogg不支持jdk-1.8版本删除1.8版本 yum remove java-1.8.0* root@n3:/opt/app/kafkaogg# yum install java-1.7.0* root@bd-qa-oracle-86:~#java -version java version "1.7.0_79" OpenJDK Runtime Environment (rhel-2.5.5.4.el6-x86_64 u79-b14) OpenJDK 64-Bit Server VM (build 24.79-b02, mixed mode) root@n3:/opt/app/kafkaogg# ls /usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/ bin lib [root@T2 dirprm]# vi ~/.bash_profile export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/ export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.101.x86_64/jre/lib/amd64 生效 [root@T2 dirprm]# source ~/.bash_profile root@n3:/opt/app/kafkaogg# export LD_LIBRARY_PATH=/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/lib/amd64/libjsig.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/lib/amd64/server/libjvm.so:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/lib/amd64/server:/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/jre/lib/amd64/


源端oracle上搭建ogg,这里安装fbo_ggs_Linux_x64_shiphome.zip 1、开启主库归档日志、补充日志及force logging SQL> alter database archivelog; alter database archivelog * ERROR at line 1: ORA-01126: database must be mounted in this instance and not open in any instance SQL> archive log list Database log mode Archive Mode Automatic archival Enabled Archive destination /opt/app/oracle/oraarch Oldest online log sequence 91 Next log sequence to archive 93 Current log sequence 93 SQL> alter database add supplemental log data;
Database altered.
SQL> alter database force logging;
Database altered.
SQL> alter system set enable_goldengate_replication=true scope=both;
System altered.
2、关闭回收站
SQL> alter system set recyclebin=off scope=spfile;
System altered. 3、创建OGG管理用户(主备库都要设置) create user ogg identified by ogg account unlock; grant connect,resource to ogg; grant select any dictionary to ogg; grant select any table to ogg; grant execute on utl_file to ogg; grant restricted session to ogg; GRANT CREATE TABLE,CREATE SEQUENCE TO OGG; (必须有的操作,后续会介绍) grant dba to ogg;(可选)
数据初始化(Oracle initial load)此时我们需要把dataprod用户下所有表导入到kafka中查看 SQL> SELECT table_name FROM all_tables WHERE owner = upper('dataprod');
TABLE_NAME ------------------------------------------------------------ CRM_AGG_USERBEHAVIOR_CALC1 DP_TRADE_PVCOUNT CRM_AGG_USERBEHAVIOR_CALC1_BK DP_AGG_USERBEHAVIORSCORE CRM_AGG_USERPUSH_CALC1 CRM_AGG
6 rows selected. SQL> select owner,trigger_name from all_triggers where owner in ('DATAPROD');
OWNER ------------------------------------------------------------ TRIGGER_NAME ------------------------------------------------------------ DATAPROD EID_ID 了解完dataprod用户表以后,我们开始初始化安装源端ogg软件 oracle@bd-qa-oracle-86:/opt/app/kafkaogg$ll total 464752 drwxr-xr-x 3 oracle oinstall 4096 Dec 12 2015 fbo_ggs_Linux_x64_shiphome -rw-r--r-- 1 oracle oinstall 475611228 Jul 13 17:54 fbo_ggs_Linux_x64_shiphome.zip -rw-r--r-- 1 oracle oinstall 282294 Jan 19 07:13 OGG-12.2.0.1.1-ReleaseNotes.pdf -rw-r--r-- 1 oracle oinstall 1559 Jan 19 07:12 OGG-12.2.0.1-README.txt oracle@bd-qa-oracle-86:/opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1$ls install response runInstaller stage 这里采用静默安装 oracle@bd-qa-oracle-86:/opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1/response$ls oggcore.rsp oracle@bd-qa-oracle-86:/opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1/response$vi oggcore.rsp oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2 INSTALL_OPTION=ORA11g START_MANAGER=true MANAGER_PORT=1357 DATABASE_LOCATION=/opt/app/oracle/product/11g INVENTORY_LOCATION=/opt/app/oraInventory UNIX_GROUP_NAME=oinstall
解释: DATABASE_LOCATION是指ORACLE_HOME路径 oracle@bd-qa-oracle-86:/opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1$./runInstaller -silent -responseFile /opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1/response/oggcore.rsp Starting Oracle Universal Installer...
Checking Temp space: must be greater than 120 MB. Actual 6427 MB Passed Checking swap space: must be greater than 150 MB. Actual 7793 MB Passed Preparing to launch Oracle Universal Installer from /tmp/OraInstall2016-07-14_09-54-11AM. Please wait ...oracle@bd-qa-oracle-86:/opt/app/kafkaogg/fbo_ggs_Linux_x64_shiphome/Disk1$[WARNING] [INS-75003] The specified directory /opt/app/kafkaogg is not empty. CAUSE: The directory specified /opt/app/kafkaogg contains files. ACTION: Clean up the specified directory or enter a new directory location. You can find the log of this install session at: /opt/app/oraInventory/logs/installActions2016-07-14_09-54-11AM.log WARNING:OUI-10030:You have specified a non-empty directory to install this product. It is recommended to specify either an empty or a non-existent directory. You may, however, choose to ignore this message if the directory contains Operating System generated files or subdirectories like lost+found. Do you want to proceed with installation in this Oracle Home? The installation of Oracle GoldenGate Core was successful. Please check '/opt/app/oraInventory/logs/silentInstall2016-07-14_09-54-11AM.log' for more details. Successfully Setup Software.
ogg软件安装成功 oracle@bd-qa-oracle-86:/opt/app/kafkaogg$./ggsci
Oracle GoldenGate Command Interpreter for Oracle Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401_FBO Linux, x64, 64bit (optimized), Oracle 11g on Dec 12 2015 00:54:38 Operating system character set identified as UTF-8.
Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.


GGSCI (bd-qa-oracle-86) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING 源端配置mgr进程 GGSCI (bd-qa-oracle-86) 10> edit params mgr PORT 1357 dynamicportlist 9901-9920,9930 autostart er * autorestart er *,retries 4,waitminutes 4 startupvalidationdelay 5 purgeoldextracts /opt/app/kafkaogg/dirdat/ff,usecheckpoints,minkeephours 2
配置数据同步用户
GGSCI (bd-qa-oracle-86) 7> dblogin userid ogg,password ogg Successfully logged into database.
GGSCI (bd-qa-oracle-86 as ogg@BDDEV) 8> add trandata dataprod.*
2016-07-14 10:43:59 WARNING OGG-06439 No unique key is defined for table CRM_AGG. All viable columns will be used to represent the key, but may not guarantee uniqueness. KEYCOLS may be used to define the key.
Logging of supplemental redo data enabled for table DATAPROD.CRM_AGG. TRANDATA for scheduling columns has been added on table 'DATAPROD.CRM_AGG'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.CRM_AGG'. Logging of supplemental redo data enabled for table DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1. TRANDATA for scheduling columns has been added on table 'DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1'. Logging of supplemental redo data enabled for table DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1_BK. TRANDATA for scheduling columns has been added on table 'DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1_BK'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.CRM_AGG_USERBEHAVIOR_CALC1_BK'. Logging of supplemental redo data enabled for table DATAPROD.CRM_AGG_USERPUSH_CALC1. TRANDATA for scheduling columns has been added on table 'DATAPROD.CRM_AGG_USERPUSH_CALC1'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.CRM_AGG_USERPUSH_CALC1'. Logging of supplemental redo data enabled for table DATAPROD.DP_AGG_USERBEHAVIORSCORE. TRANDATA for scheduling columns has been added on table 'DATAPROD.DP_AGG_USERBEHAVIORSCORE'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.DP_AGG_USERBEHAVIORSCORE'. 2016-07-14 10:44:00 WARNING OGG-06439 No unique key is defined for table DP_TRADE_PVCOUNT. All viable columns will be used to represent the key, but may not guarantee uniqueness. KEYCOLS may be used to define the key.
Logging of supplemental redo data enabled for table DATAPROD.DP_TRADE_PVCOUNT. TRANDATA for scheduling columns has been added on table 'DATAPROD.DP_TRADE_PVCOUNT'. TRANDATA for instantiation CSN has been added on table 'DATAPROD.DP_TRADE_PVCOUNT'. 源端初始化配置 A:配置extract进程,注意这里使用direct file模式,而不是load模式 GGSCI (bd-qa-oracle-86) 2> add extract fafainie, sourceistable EXTRACT added.

GGSCI (bd-qa-oracle-86) 3> info extract *,tasks
EXTRACT FAFAINIE Initialized 2016-07-14 11:03 Status STOPPED Checkpoint Lag Not Available Log Read Checkpoint Not Available First Record Record 0 Task SOURCEISTABLE

GGSCI (bd-qa-oracle-86) 8> edit params fafainie extract fafainie userid ogg,password ogg rmthost 172.16.57.55,mgrport 1357 --rmttask replicat,group fafainir RMTFILE ./dirdat/ff table dataprod.*;
目标端初始化配置配置mgr GGSCI (T2) 2> edit params mgr

port 1357 ACCESSRULE, PROG REPLICAT, IPADDR 172.16.57.*, ALLOW dynamicportlist 9901-9920,9930 autostart er * autorestart er *,retries 4,waitminutes 4 startupvalidationdelay 5 purgeoldextracts /opt/app/kafkaogg/dirdat/*,usecheckpoints,minkeephours 2 LAGREPORTHOURS 1 LAGINFOMINUTES 30 LAGCRITICALMINUTES 45 A:配置replicat进程 GGSCI (T2) 5> add replicat fafainir, specialrun REPLICAT added.
GGSCI (T2) 11> info replicat *, TASKS
REPLICAT FAFAINIR Initialized 2016-07-14 11:05 Status STOPPED Checkpoint Lag 00:00:00 (updated 00:00:07 ago) Log Read Checkpoint Not Available Task SPECIALRUN B:编辑replicat进程参数
注意这里修改这个文件 [root@T2 dirprm]# vi rkafka.prm
SPECIALRUN end runtime -- REPLICAT fafainir -- Trail file for this example is located in "AdapterExamples/trail" directory -- Command to add REPLICAT -- add replicat rkafka, exttrail AdapterExamples/trail/ff setenv (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK) TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props EXTFILE ./dirdat/ff DDL INCLUDE ALL REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP dataprod.*, TARGET fafaschema.*;
[root@T2 dirprm]# mv rkafka.prm fafainir.prm 这里target 可以随便取名,但是这里可源库的gg.handler.kafkahandler.SchemaTopicName一致,只是做一个标识。kafka发出的消息所有消费者都可以消费了。 启动初始化进程 源端:
GGSCI (bd-qa-oracle-86) 5> start fafainie
Sending START request to MANAGER ... EXTRACT FAFAINI starting

GGSCI (bd-qa-oracle-86) 6> view report fafainie 目标端:
[root@T2 kafkaogg]# replicat paramfile ./dirprm/fafainir.prm reportfile ./dirrpt/fafainir.rpt -p INITIALDATALOAD

[root@T2 kafkaogg]# tail -100f ggserr.log
验证传输情况,查看kafka消费情况 JFAFASCHEMA.CRM_AGG_USERBEHAVIOR_CALC1I42016-07-14 07:24:09.02265842016-07-15T18:03:05.741000(00000000000000003583 USERID:2016-05-03:00:00:00.00000000020250006userA001?????SHSHSHSH:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.0000000000000:2016-05-03:00:00:00.000000000000 JFAFASCHEMA.CRM_AGG_USERBEHAVIOR_CALC1I42016-07-14 07:24:09.02265842016-07-15T18:03:25.776000(00000000000000004064 USERID:2016-05-03:00:00:00.00000000020250006userA002?????SHSHSHSH:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.000000000:2016-05-03:00:00:00.0000000000000:2016-05-03:00:00:00.000000000000 JFAFASCHEMA.CRM_AGG_USERBEHAVIOR_CALC1I42016-07-14 07:24:09.02265842016-07-15T18:03:35.783000(00000000000000004545 数据成功传输 数据同步 1、配置DDL同步 A:目标库 配置globals参数 GGSCI (T2) 3> view param ./globals
ggschema ogg
B:源库 执行DDL配置脚本 sqlplus / as sysdba SQL> @/opt/app/kafkaogg/marker_setup.sql
Marker setup script
You will be prompted for the name of a schema for the Oracle GoldenGate database objects. NOTE: The schema must be created prior to running this script. NOTE: Stop all DDL replication before starting this installation.
Enter Oracle GoldenGate schema name:ogg

Marker setup table script complete, running verification script... Please enter the name of a schema for the GoldenGate database objects: Setting schema name to OGG
MARKER TABLE -------------------------------------------------------------- OK
MARKER SEQUENCE -------------------------------------------------------------- OK
Script complete. 输入OGG管理用户名:ogg 首先建立ogg独有的表空间 SQL> CREATE TABLESPACE TBS_OGG DATAFILE '/opt/app/oracle/oradata/orcl11g/tbs_ogg_01.dbf' SIZE 2G AUTOEXTEND ON NEXT 50M MAXSIZE UNLIMITED; Tablespace created. SQL> alter user ogg DEFAULT TABLESPACE TBS_OGG; User altered. SQL> grant connect,resource,unlimited tablespace to ogg; Grant succeeded.
SQL> @/opt/app/kafkaogg/ddl_setup.sql
Oracle GoldenGate DDL Replication setup script
Verifying that current user has privileges to install DDL Replication...
You will be prompted for the name of a schema for the Oracle GoldenGate database objects. NOTE: For an Oracle 10g source, the system recycle bin must be disabled. For Oracle 11g and later, it can be enabled. NOTE: The schema must be created prior to running this script. NOTE: Stop all DDL replication before starting this installation.
Enter Oracle GoldenGate schema name:ogg
Working, please wait ... Spooling to file ddl_setup_spool.txt
Checking for sessions that are holding locks on Oracle Golden Gate metadata tables ...
Check complete.
Using OGG as a Oracle GoldenGate schema name.
Working, please wait ...
DDL replication setup script complete, running verification script... Please enter the name of a schema for the GoldenGate database objects: Setting schema name to OGG
CLEAR_TRACE STATUS:
Line/pos -------------------------------------------------------------------------------- Error ----------------------------------------------------------------- No errors No errors .................... ....................
输入OGG管理用户名:ogg 注意1:此处可能会报错:ORA-04098: trigger 'SYS.GGS_DDL_TRIGGER_BEFORE' is invalid and failed,同时OGG中的很多表和视图无法创建,原因主要由于OGG缺少权限引起,即便有 DBA权限也是不足的(OGG BUG),可以通过如下方法修复: 1)先将触发器关闭,否则执行任何sql都会包ORA-04098的错误 @/opt/app/OGG/ddl_disable.sql 2)赋予ogg对应权限 grant execute on utl_file to ogg; grant restricted session to ogg; GRANT CREATE TABLE,CREATE SEQUENCE TO OGG; 3)重新执行ddl_setup.sql 注意2:当主库上有很多应用连接时,执行该sql会出现如下报警: IMPORTANT: Oracle sessions that used or may use DDL must be disconnected. If you continue, some of these sessions may cause DDL to fail with ORA-6508. To proceed, enter yes. To stop installation, enter no. Enter yes or no: 为了不影响主库,选no,选择一个时间点,停止应用再创建ddl。 如果不创建ddl,需要在主备库的ogg进程参数中添加truncate选项: gettruncates,参考后面同步进程配置。
SQL> @/opt/app/kafkaogg/role_setup.sql
GGS Role setup script
This script will drop and recreate the role GGS_GGSUSER_ROLE To use a different role name, quit this script and then edit the params.sql script to change the gg_role parameter to the preferred name. (Do not run the script.)
You will be prompted for the name of a schema for the GoldenGate database objects. NOTE: The schema must be created prior to running this script. NOTE: Stop all DDL replication before starting this installation.
Enter GoldenGate schema name:ogg Wrote file role_setup_set.txt
PL/SQL procedure successfully completed.

Role setup script complete
Grant this role to each user assigned to the Extract, GGSCI, and Manager processes, by using the following SQL command:
GRANT GGS_GGSUSER_ROLE TO
where is the user assigned to the GoldenGate processes. 输入OGG管理用户名:ogg

SQL> GRANT GGS_GGSUSER_ROLE TO OGG;
Grant succeeded.
SQL> @/opt/app/kafkaogg/ddl_enable.sql
Trigger altered. 2、配置数据同步 A:源库 配置日志抓取进程 GGSCI (bd-qa-oracle-86) 1> add extract fafae, tranlog, begin now EXTRACT added.

GGSCI (bd-qa-oracle-86) 2> add rmttrail /opt/app/kafkaogg/dirdat/fa,extract fafae RMTTRAIL added.
注意这里不能和初始化trail文件名称一致,初始化时ff,这里使用fa
GGSCI (bd-qa-oracle-86) 3> edit params fafae extract fafae userid ogg,password ogg rmthost 172.16.57.55, mgrport 1357 rmttrail /opt/app/kafkaogg/dirdat/fa discardfile /opt/app/kafkaogg/dirrpt/trail.dsc,append,megabytes 100 ddl include mapped table dataprod.*; B:目标库 配置日志解析进程 1)编辑globals参数 GGSCI (T2) 4> edit params ./GLOBALS

ggschema ogg checkpointtable ogg.chkpnt_fafa 2)创建checkpoint表
这里在目标端由于kafka自己可以记录所以无需创建,其实也无法创建
3)配置解析进程
GGSCI (T2) 6> add replicat fafar,exttrail /opt/app/kafkaogg/dirdat/fa REPLICAT added.

GGSCI (T2) 7> info replicat *, TASKS
REPLICAT FAFAINIR Initialized 2016-07-14 16:18 Status STOPPED Checkpoint Lag 00:00:00 (updated 22:31:52 ago) Log Read Checkpoint Not Available Task SPECIALRUN
REPLICAT REP_INIT Initialized 2016-07-15 09:02 Status STOPPED Checkpoint Lag 00:00:00 (updated 05:48:32 ago) Log Read Checkpoint Not Available Task SPECIALRUN [root@T2 dirprm]# vi fafar.prm
REPLICAT fafar -- Trail file for this example is located in "AdapterExamples/trail" directory -- Command to add REPLICAT -- add replicat rkafka, exttrail AdapterExamples/trail/ff setenv (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK) TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props DDL INCLUDE ALL REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP dataprod.*, TARGET fafaschema.*; C:启动同步进程这里总结一下: 1,初始化的时候用的是rmtfile,然后他的replicat进程也必须是rmtfile 2,然后同步进程源端是rmttrail,他的目标端必须是rmttrail,他们必须都保持一致
3,这里强调,初始化进程和同步进程的trail文件名称不能一致,不然会因为初始化trail文件和同步trail文件格式不同而报with format RELEASE 12.2, does not match current format specification of RELEASE 9.0/9.5错误。 源库: GGSCI (bd-qa-oracle-86) 5> start fafae
Sending START request to MANAGER ... EXTRACT FAFAE starting
目标库: GGSCI (T2) 2> start fafar
Sending START request to MANAGER ... REPLICAT FAFAR starting
GGSCI (T2) 3> view report fafar 测试源端: SQL> conn dataprod/dataprod Connected. SQL> create table fafa(a int);
Table created.
SQL> insert into fafa values(1);
1 row created.
SQL> commit;
Commit complete. 目标端: [root@T2 bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic fafatable FAFASCHEMA.FAFAI42016-07-15 09:37:38.20047342016-07-15T17:37:43.662000(00000000000000003128A1 FAFASCHEMA.FAFAI42016-07-15 09:38:51.19952442016-07-15T17:38:55.152000(00000000000000003242A2 同步进程成功




ogg--------------
相关TAG标签
上一篇:紫金保险某系统GetShell影响数百万团体/个人保单信息
下一篇:新浪存在远程命令执行漏洞(ImageMagick补丁绕过)
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站