diff --git a/spam.csv b/spam.csv index 02d850f..890229c 100644 --- a/spam.csv +++ b/spam.csv @@ -5573,3 +5573,8 @@ ham,Will Ì_ b going to esplanade fr home?,,, ham,"Pity, * was in mood for that. So...any other suggestions?",,, ham,The guy did some bitching but I acted like i'd be interested in buying something else next week and he gave it to us for free,,, ham,Rofl. Its true to its name,,, +False,how the weather is today +False,how the weather is today +False,how the weather is today +ham,how the weather is today +ham,it's me diff --git a/src/model/updater.py b/src/model/updater.py index cb76d51..7e165a3 100644 --- a/src/model/updater.py +++ b/src/model/updater.py @@ -1,5 +1,7 @@ import csv import datetime as dt +from threading import Thread + from src.l.logger import logger import src.model.trainer as trainer from scheduler import Scheduler @@ -8,6 +10,7 @@ import time import requests from src.transport.rabbitmq import RabbitMQ from src.transport.train_dto import TrainDto +import threading def _does_file_exist_in_dir(path): @@ -15,16 +18,23 @@ def _does_file_exist_in_dir(path): def _listen_to_trainings(csv_file: str, rabbitmq: RabbitMQ, queue: str) -> None: def _callback(ch, method, properties, body): + logger.info(f"Message consumed: {body}") dto = TrainDto.from_json(body) + logger.info(f"Message read as DTO: {body}") with open(csv_file, "a") as f: + logger.info(f"Writing to dataset: {body}") writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) - writer.writerow([dto.is_spam, dto.text]) + writer.writerow([dto.spam_interpretation(), dto.text]) + logger.info(f"Message is done: {body}") rabbitmq.consume(queue_name=queue, callback=_callback, auto_ack=True) -def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str, rabbitmq: RabbitMQ, queue: str) -> None: - logger.info("Starting...") - _listen_to_trainings(csv_file=dataset_path, rabbitmq=rabbitmq, queue=queue) +def _start_listening_to_trainings(csv_file: str, rabbitmq: RabbitMQ, queue: str) -> threading.Thread: + logger.info("Starting listening to trainings") + t = threading.Thread(target=_listen_to_trainings, args=(csv_file, rabbitmq, queue), daemon=True) + t.start() + return t +def _start_scheduling(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str): def _restart_web_api() -> None: headers = { 'Authorization': f"Bearer {token}" } response = requests.post( @@ -50,3 +60,16 @@ def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: st while True: scheduler.exec_jobs() time.sleep(1) + +def _start_scheduling_in_thread(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str) -> threading.Thread: + logger.info("Starting scheduling in thread") + t = threading.Thread(target=_start_scheduling, args=(fucking_path, models_dir, dataset_path, web_api_url, token), daemon=True) + t.start() + return t + +def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str, rabbitmq: RabbitMQ, queue: str) -> None: + logger.info("Starting...") + t1 = _start_listening_to_trainings(csv_file=dataset_path, rabbitmq=rabbitmq, queue=queue) + t2 = _start_scheduling_in_thread(fucking_path=fucking_path, models_dir=models_dir, dataset_path=dataset_path, web_api_url=web_api_url, token=token) + t1.join() + t2.join() diff --git a/src/transport/rabbitmq.py b/src/transport/rabbitmq.py index 16857a7..b65c42a 100644 --- a/src/transport/rabbitmq.py +++ b/src/transport/rabbitmq.py @@ -1,9 +1,7 @@ from typing import Optional - import pika -import os - from pika.adapters.blocking_connection import BlockingChannel +from src.l.logger import logger class RabbitMQ: @@ -14,7 +12,7 @@ class RabbitMQ: self.password = passwd self.connection = None self.channel: Optional[BlockingChannel] = None - self.connect() + # self.connect() def connect(self): credentials = pika.PlainCredentials(self.user, self.password) @@ -30,19 +28,23 @@ class RabbitMQ: self.connection.close() def consume(self, queue_name, callback, auto_ack: bool = True): + self.connect() if not self.channel: raise Exception("Connection is not established.") self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) self.channel.start_consuming() def publish(self, queue_name, message): + self.connect() if not self.channel: raise Exception("Connection is not established.") - self.channel.queue_declare(queue=queue_name, durable=True) - self.channel.basic_publish(exchange='', - routing_key=queue_name, + # self.channel.queue_declare(queue=queue_name, durable=True) + self.channel.basic_publish(exchange=queue_name, + routing_key='', body=message, properties=pika.BasicProperties( delivery_mode=2, # make message persistent - )) - print(f"Sent message to queue {queue_name}: {message}") \ No newline at end of file + ), + mandatory=True + ) + logger.info(f"Sent message to queue {queue_name}: {message}") diff --git a/src/transport/train_dto.py b/src/transport/train_dto.py index eaf608c..6ffc2a6 100644 --- a/src/transport/train_dto.py +++ b/src/transport/train_dto.py @@ -15,8 +15,11 @@ class TrainDto: 'text': self.text }) + def spam_interpretation(self): + return 'spam' if self.is_spam else 'ham' + @classmethod - def from_json(cls: Type[T], s: str) -> T: + def from_json(cls: Type[T], s: bytes) -> T: j = json.loads(s) if not 'is_spam' in j or not 'text' in j: raise Exception("Wrong format")