es.davy.ai

Preguntas y respuestas de programación confiables

¿Tienes una pregunta?

Si tienes alguna pregunta, puedes hacerla a continuación o ingresar lo que estás buscando.

Detener/Iniciar el flujo de tweets programáticamente utilizando Tweepy

Estoy intentando construir una aplicación web utilizando Django para rastrear tweets por hashtags. Estoy utilizando Tweepy AsyncStreaming y está funcionando bien. Soy nuevo en la programación asíncrona. Necesito ayuda sobre cómo detener el stream en ejecución y comenzar un nuevo stream cuando se agrega un nuevo hashtag.

Aquí está mi código:

import os

from tweepy.asynchronous import AsyncStream 
import asyncio


consumer_key = os.getenv('XXXXXXXXXXX')
consumer_secret = os.getenv('XXXXXXXXXXX')
access_token = os.getenv('XXXXXXXXXXX')
access_token_secret = os.getenv('XXXXXXXXXXX')


async def main():
    stream = TweetStream(consumer_key, consumer_secret,
                        access_token, access_token_secret)
    await stream.filter(follow=['web3', 'crypto'])


class TweetStream(AsyncStream):
    async def on_connect(self):
        print('Stream connected')

    async def on_status(self, status):
        if status.in_reply_to_user_id is None and not(status.entities['user_mentions']):
            print(status.text)
            print(status.user.screen_name)

    async def on_error(self, status):
        print(status)


if __name__ == '__main__':
    asyncio.run(main())
Tags:  , , ,

Answer

  1. Avatar for davy.ai

    Para detener el flujo actual y comenzar un nuevo flujo cuando se agregue un nuevo hashtag, puedes modificar el código de la siguiente manera:

    1. Agrega una variable global para almacenar el hashtag actual que se está rastreando.
    2. Modifica el método main() para que tome un parámetro de hashtag y lo almacene en la variable global.

    3. Modifica la clase TweetStream para que utilice la variable global para filtrar los tweets.

    4. Crea un nuevo bucle para ejecutar el flujo y utiliza asyncio.gather para ejecutar el bucle y el método principal de forma concurrente.

    Aquí está el código modificado:

    import os
    from tweepy.asynchronous import AsyncStream
    import asyncio
    
    consumer_key = os.getenv('XXXXXXXXXXX')
    consumer_secret = os.getenv('XXXXXXXXXXX')
    access_token = os.getenv('XXXXXXXXXXX')
    access_token_secret = os.getenv('XXXXXXXXXXX')
    current_hashtag = None  # variable global para almacenar el hashtag actual que se está rastreando
    
    async def main(hashtag):
        global current_hashtag
        current_hashtag = hashtag  # actualizar el hashtag actual
        stream = TweetStream(consumer_key, consumer_secret,
                             access_token, access_token_secret)
        await stream.filter(track=[hashtag])
    
    class TweetStream(AsyncStream):
        async def on_connect(self):
            print('Stream conectado')
    
        async def on_status(self, status):
            if current_hashtag and current_hashtag in status.text.lower():
                if status.in_reply_to_user_id == None and not(status.entities['user_mentions']):
                    print(status.text)
                    print(status.user.screen_name)
    
        async def on_error(self, status):
            print(status)
    
    if __name__ == '__main__':
        hashtag = "web3"
        asyncio.run(asyncio.gather(main(hashtag), asyncio.get_running_loop()))
    

    Para iniciar un nuevo flujo, simplemente llama al método main() con un nuevo hashtag. El flujo actual se detendrá automáticamente y se iniciará un nuevo flujo con el nuevo hashtag.

    Por ejemplo:

    hashtag = "crypto"
    asyncio.run(asyncio.gather(main(hashtag), asyncio.get_running_loop()))
    

Comments are closed.