communication is ok
This commit is contained in:
parent
ea9037e9e7
commit
d4314f45b4
5
spam.csv
5
spam.csv
@ -5573,3 +5573,8 @@ ham,Will Ì_ b going to esplanade fr home?,,,
|
||||
ham,"Pity, * was in mood for that. So...any other suggestions?",,,
|
||||
ham,The guy did some bitching but I acted like i'd be interested in buying something else next week and he gave it to us for free,,,
|
||||
ham,Rofl. Its true to its name,,,
|
||||
False,how the weather is today
|
||||
False,how the weather is today
|
||||
False,how the weather is today
|
||||
ham,how the weather is today
|
||||
ham,it's me
|
||||
|
Can't render this file because it has a wrong number of fields in line 5576.
|
@ -1,5 +1,7 @@
|
||||
import csv
|
||||
import datetime as dt
|
||||
from threading import Thread
|
||||
|
||||
from src.l.logger import logger
|
||||
import src.model.trainer as trainer
|
||||
from scheduler import Scheduler
|
||||
@ -8,6 +10,7 @@ import time
|
||||
import requests
|
||||
from src.transport.rabbitmq import RabbitMQ
|
||||
from src.transport.train_dto import TrainDto
|
||||
import threading
|
||||
|
||||
|
||||
def _does_file_exist_in_dir(path):
|
||||
@ -15,16 +18,23 @@ def _does_file_exist_in_dir(path):
|
||||
|
||||
def _listen_to_trainings(csv_file: str, rabbitmq: RabbitMQ, queue: str) -> None:
|
||||
def _callback(ch, method, properties, body):
|
||||
logger.info(f"Message consumed: {body}")
|
||||
dto = TrainDto.from_json(body)
|
||||
logger.info(f"Message read as DTO: {body}")
|
||||
with open(csv_file, "a") as f:
|
||||
logger.info(f"Writing to dataset: {body}")
|
||||
writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
|
||||
writer.writerow([dto.is_spam, dto.text])
|
||||
writer.writerow([dto.spam_interpretation(), dto.text])
|
||||
logger.info(f"Message is done: {body}")
|
||||
rabbitmq.consume(queue_name=queue, callback=_callback, auto_ack=True)
|
||||
|
||||
def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str, rabbitmq: RabbitMQ, queue: str) -> None:
|
||||
logger.info("Starting...")
|
||||
_listen_to_trainings(csv_file=dataset_path, rabbitmq=rabbitmq, queue=queue)
|
||||
def _start_listening_to_trainings(csv_file: str, rabbitmq: RabbitMQ, queue: str) -> threading.Thread:
|
||||
logger.info("Starting listening to trainings")
|
||||
t = threading.Thread(target=_listen_to_trainings, args=(csv_file, rabbitmq, queue), daemon=True)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
def _start_scheduling(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str):
|
||||
def _restart_web_api() -> None:
|
||||
headers = { 'Authorization': f"Bearer {token}" }
|
||||
response = requests.post(
|
||||
@ -50,3 +60,16 @@ def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: st
|
||||
while True:
|
||||
scheduler.exec_jobs()
|
||||
time.sleep(1)
|
||||
|
||||
def _start_scheduling_in_thread(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str) -> threading.Thread:
|
||||
logger.info("Starting scheduling in thread")
|
||||
t = threading.Thread(target=_start_scheduling, args=(fucking_path, models_dir, dataset_path, web_api_url, token), daemon=True)
|
||||
t.start()
|
||||
return t
|
||||
|
||||
def start(fucking_path: str, models_dir: str, dataset_path: str, web_api_url: str, token: str, rabbitmq: RabbitMQ, queue: str) -> None:
|
||||
logger.info("Starting...")
|
||||
t1 = _start_listening_to_trainings(csv_file=dataset_path, rabbitmq=rabbitmq, queue=queue)
|
||||
t2 = _start_scheduling_in_thread(fucking_path=fucking_path, models_dir=models_dir, dataset_path=dataset_path, web_api_url=web_api_url, token=token)
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
@ -1,9 +1,7 @@
|
||||
from typing import Optional
|
||||
|
||||
import pika
|
||||
import os
|
||||
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
from src.l.logger import logger
|
||||
|
||||
|
||||
class RabbitMQ:
|
||||
@ -14,7 +12,7 @@ class RabbitMQ:
|
||||
self.password = passwd
|
||||
self.connection = None
|
||||
self.channel: Optional[BlockingChannel] = None
|
||||
self.connect()
|
||||
# self.connect()
|
||||
|
||||
def connect(self):
|
||||
credentials = pika.PlainCredentials(self.user, self.password)
|
||||
@ -30,19 +28,23 @@ class RabbitMQ:
|
||||
self.connection.close()
|
||||
|
||||
def consume(self, queue_name, callback, auto_ack: bool = True):
|
||||
self.connect()
|
||||
if not self.channel:
|
||||
raise Exception("Connection is not established.")
|
||||
self.channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
|
||||
self.channel.start_consuming()
|
||||
|
||||
def publish(self, queue_name, message):
|
||||
self.connect()
|
||||
if not self.channel:
|
||||
raise Exception("Connection is not established.")
|
||||
self.channel.queue_declare(queue=queue_name, durable=True)
|
||||
self.channel.basic_publish(exchange='',
|
||||
routing_key=queue_name,
|
||||
# self.channel.queue_declare(queue=queue_name, durable=True)
|
||||
self.channel.basic_publish(exchange=queue_name,
|
||||
routing_key='',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=2, # make message persistent
|
||||
))
|
||||
print(f"Sent message to queue {queue_name}: {message}")
|
||||
),
|
||||
mandatory=True
|
||||
)
|
||||
logger.info(f"Sent message to queue {queue_name}: {message}")
|
||||
|
@ -15,8 +15,11 @@ class TrainDto:
|
||||
'text': self.text
|
||||
})
|
||||
|
||||
def spam_interpretation(self):
|
||||
return 'spam' if self.is_spam else 'ham'
|
||||
|
||||
@classmethod
|
||||
def from_json(cls: Type[T], s: str) -> T:
|
||||
def from_json(cls: Type[T], s: bytes) -> T:
|
||||
j = json.loads(s)
|
||||
if not 'is_spam' in j or not 'text' in j:
|
||||
raise Exception("Wrong format")
|
||||
|
Loading…
Reference in New Issue
Block a user