Python抓取数据并发送给kafka

    xiaoxiao2022-07-04  219

     

     

    #coding=UTF-8 import requests import  json from lxml import  etree from pykafka import KafkaClient import sys import schedule import time

    class Aiqiyi():     def __init__(self):         myhosts = "127.0.0.1:9092"         client = KafkaClient(hosts=myhosts)         self.topic = client.topics['test'.encode()]         print(client.topics)

    def sendMessage(self, mydict):         with self.topic.get_sync_producer(delivery_reports=True) as producer:             data = json.dumps(mydict)             producer.produce(bytes(data, encoding='utf-8')) def  getProperti():

            headers = {                 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'}         url = 'http://list.iqiyi.com/www/25/-------------4-1-2-iqiyi-1-.html'         sourceHtml = requests.get(url, headers=headers).content.decode('utf-8')         selector = etree.HTML(sourceHtml)         links = selector.xpath('/html/body/div[3]/div/div/div[3]/div/ul[@class="site-piclist site-piclist-180101 site-piclist-auto"]/li[1]')         for link in links:                     #节目链接                     href = link.xpath("//div['site-piclist_info']/div[1]/p/a/@href")                     #节目名称                     name = link.xpath("//div['site-piclist_info']/div[1]/p/a/text()")                     #节目图片                     img = link.xpath("//div[@class='site-piclist_pic']/a/img/@src")                     #上映时间                     time = link.xpath("//div['site-piclist_info']/div[@class='role_info']/text()")

                        multi_list = map(list, zip(name, href, img,time))

                        for i in multi_list:                         jso = {}                         jso["name"] = str(i[0].strip())                         jso["href"] = str(i[1].strip())                         jso["img"]  = str(i[2].strip())                         jso["time"] = str(i[3].replace("\r\n","").strip())                         df = [('a111', 'a2', 'a3'), ('b111', 'b2', 'b3'), ('c111', 'c2', 'c3')]                         sendMessage(df)                         #print(jso) def job():     print("I'm working...")

    if __name__ == "__main__":     schedule.every(1).minutes.do(getProperti) while True:     schedule.run_pending()     time.sleep(1)

     

     

     

    #coding=UTF-8

    from pykafka import KafkaClient import json import sys

    class pythonSendkafka:

        def clien(self):         myhosts = "127.0.0.1:9092"         client = KafkaClient(hosts=myhosts)         self.topic = client.topics['test'.encode()]         print(client.topics)

        def sendMessage(self, mydict):         with self.topic.get_sync_producer(delivery_reports=True) as producer:                 data = json.dumps(mydict)                 producer.produce(bytes(data, encoding='utf-8')) df = [('a111', 'a2', 'a3'), ('b111', 'b2', 'b3'), ('c111', 'c2', 'c3')] py = pythonSendkafka() py.clien() py.sendMessage(df)

    最新回复(0)