频道栏目
首页 > 资讯 > Python > 正文

Python操作Kafka爬坑

17-08-29        来源:[db:作者]  
收藏   我要投稿

组内做大数据,需要kafka写入数据,最近在看python正好,练练手,网上找了一圈,都是用的pykafka,经过一整圈的安装,最终搞定,代码如下

#coding:u8

import sys

import time

import random

import datetime

import MySQLdb

import codecs

from pykafka import KafkaClient

import logging

import json

import threading

'''

******************

'''

ad=[]

try:

ini=file("set.txt")

ad=ini.readline().splitlines()

ini.close

except Exception as e:

print "open settings file Error:",type(e)

ad=["192.168.1.121:9092"]

print "open ini file"

try:

client = KafkaClient(hosts = ad[0])

print "Topics:",client.topics

topic = client.topics["mytopic"]

except Exception as e:

print "Opening kafka Error:%s" %(type(e))

sys.exit(1)

print "before threading"

try:

with tp.get_sync_producer() as producer:

producer.produce(str(dct2))

except Exception as e:

print "Error:" ,type(e)

print "ini consumer"

while 1==1:

print "nn",type(consumer)

for message in consumer:

print "mm"

if message is not None:

print message.offset, message.value

except Exception as e:

print e,type(e)

运行结果,可以列出topic,写入的数据也没有报错信息。但是,消费者取不到数据,无论是kafka直接取,还是python写消费者代码。

后来采用了 kafkapython 正常,代码如下:

#coding:utf-8

import sys

import time

import random

import datetime

import codecs

import kafka.kafkaProducer

import logging

import json

import threading

ad=[]

try:

ini=file("set.txt")

ad=ini.readline().splitlines()

ini.close

except Exception as e:

ad=["192.168.1.121:9092,192.168.1.122.9092"]

#print "open settings file Error:%d,%s" %(e.args[0],e.args[1])

print "Opening settings file Error:",e,type(e)

print "Opened ini file"

'''

try:

client = KafkaClient(hosts = ad[0])

print "Topics:",client.topics

topic = client.topics["mytopic"]

except Exception as e:

print "Opening kafka Error:%s" %(e.args[0])

sys.exit(1)

print "before threading"

'''

try:

producer = KafkaProducer(bootstrap_servers=ad[0], value_serializer=lambda m: json.dumps(m).encode('utf-8'))

except Exception as e:

print "Opening kafka Error:",e,type(e)

sys.exit(1)

print "before threading"

threads=[]

for i in range(0,12):

try:

threads.append(threading.Thread(target=tf,args=(producer,i)))

threads[i].start()

except Exception as e:

print "Treand error at Thread:%d:%s,%s" %(i,e,type(e))

print "main thread is ended"

代码均有所节略。

相关TAG标签
上一篇:用shell脚本实现监控程序自动重启
下一篇:LintCode 子树
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站