This branch with celery async workers has no code errors, but running a GPT thread on the celery worker causes a SIGFAULT failure. Need to try this on a server instead of my local machine to see if it will run without failing. If it still fails, then I need to separate my async tasks and the flask app with the tasks dumping into the db and flask just pulling out of the DB.
This commit is contained in:
20
project/__init__.py
Normal file
20
project/__init__.py
Normal file
@ -0,0 +1,20 @@
|
||||
from flask import Flask
|
||||
|
||||
from .extensions import db
|
||||
from .views import main
|
||||
from .utils import make_celery
|
||||
|
||||
def create_app():
|
||||
app = Flask(__name__)
|
||||
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///db.sqlite3"
|
||||
app.config["SECRET_KEY"] = "super-secret-key"
|
||||
app.config["CELERY_CONFIG"] = {"broker_url": "redis://localhost"}
|
||||
|
||||
db.init_app(app)
|
||||
|
||||
app.register_blueprint(main)
|
||||
|
||||
celery = make_celery(app)
|
||||
celery.set_default()
|
||||
|
||||
return app, celery
|
||||
BIN
project/__pycache__/__init__.cpython-310.pyc
Normal file
BIN
project/__pycache__/__init__.cpython-310.pyc
Normal file
Binary file not shown.
BIN
project/__pycache__/extensions.cpython-310.pyc
Normal file
BIN
project/__pycache__/extensions.cpython-310.pyc
Normal file
Binary file not shown.
BIN
project/__pycache__/models.cpython-310.pyc
Normal file
BIN
project/__pycache__/models.cpython-310.pyc
Normal file
Binary file not shown.
BIN
project/__pycache__/tasks.cpython-310.pyc
Normal file
BIN
project/__pycache__/tasks.cpython-310.pyc
Normal file
Binary file not shown.
BIN
project/__pycache__/utils.cpython-310.pyc
Normal file
BIN
project/__pycache__/utils.cpython-310.pyc
Normal file
Binary file not shown.
BIN
project/__pycache__/views.cpython-310.pyc
Normal file
BIN
project/__pycache__/views.cpython-310.pyc
Normal file
Binary file not shown.
3
project/extensions.py
Normal file
3
project/extensions.py
Normal file
@ -0,0 +1,3 @@
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
|
||||
db = SQLAlchemy()
|
||||
8
project/models.py
Normal file
8
project/models.py
Normal file
@ -0,0 +1,8 @@
|
||||
from datetime import datetime
|
||||
|
||||
from .extensions import db
|
||||
|
||||
class Result(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
text = db.Column(db.String(100), nullable=False)
|
||||
date_created = db.Column(db.DateTime, default=datetime.utcnow)
|
||||
64
project/tasks.py
Normal file
64
project/tasks.py
Normal file
@ -0,0 +1,64 @@
|
||||
from celery import shared_task
|
||||
from time import sleep
|
||||
from redbeat import RedBeatSchedulerEntry
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.sql.expression import func, select, insert
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from gpt4all import GPT4All
|
||||
from sqlalchemy.orm import Session
|
||||
from celery import current_app as celery_app
|
||||
|
||||
from .extensions import db
|
||||
from .models import Result
|
||||
|
||||
engine = create_engine("sqlite:///words_prompts.db", pool_pre_ping=True)
|
||||
|
||||
Base = automap_base()
|
||||
Base.prepare(engine, reflect=True)
|
||||
|
||||
Words = Base.classes.words
|
||||
Themes = Base.classes.themes
|
||||
|
||||
MODEL = GPT4All(
|
||||
model_name="gpt4all-falcon-q4_0.gguf",
|
||||
# model_path=(Path.home() / ".cache" / "gpt4all"),
|
||||
allow_download=False,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
def my_task(text, schedule_name):
|
||||
while True:
|
||||
with Session(engine) as word_session:
|
||||
random_word = word_session.query(Words.words)
|
||||
random_word = random_word.order_by(func.random()).first()
|
||||
random_word = str(random_word)[4:-4]
|
||||
# SYSTEM_TEMPLATE = "A single sentence based on a word."
|
||||
# PROMPT_TEMPLATE = "### Instruction: {0} \n### Response: "
|
||||
response = MODEL.generate(
|
||||
f"Give me a writing prompt about {random_word}.",
|
||||
temp=0.7,
|
||||
callback=stop_on_token_callback,
|
||||
)
|
||||
word_session.execute(insert(Themes).values(themes=response))
|
||||
word_session.commit()
|
||||
|
||||
try:
|
||||
entry = RedBeatSchedulerEntry.from_key(
|
||||
"redbeat:" + schedule_name, app=celery_app
|
||||
)
|
||||
except KeyError:
|
||||
entry = None
|
||||
|
||||
if entry:
|
||||
entry.delete()
|
||||
|
||||
|
||||
def stop_on_token_callback(token_id, token_string):
|
||||
"""
|
||||
Function to limit return length of the
|
||||
gpt4all response. Period indicates a sentence.
|
||||
"""
|
||||
if "." in token_string:
|
||||
return False
|
||||
return True
|
||||
13
project/utils.py
Normal file
13
project/utils.py
Normal file
@ -0,0 +1,13 @@
|
||||
from celery import Celery
|
||||
|
||||
def make_celery(app):
|
||||
celery = Celery(app.import_name)
|
||||
celery.conf.update(app.config["CELERY_CONFIG"])
|
||||
|
||||
class ContextTask(celery.Task):
|
||||
def __call__(self, *args, **kwargs):
|
||||
with app.app_context():
|
||||
return self.run(*args, **kwargs)
|
||||
|
||||
celery.Task = ContextTask
|
||||
return celery
|
||||
29
project/views.py
Normal file
29
project/views.py
Normal file
@ -0,0 +1,29 @@
|
||||
from flask import Blueprint
|
||||
from redbeat import RedBeatSchedulerEntry
|
||||
from redbeat.schedules import rrule
|
||||
from datetime import datetime
|
||||
from celery import current_app as celery_app
|
||||
|
||||
from uuid import uuid4
|
||||
|
||||
from .tasks import my_task
|
||||
|
||||
main = Blueprint("main", __name__)
|
||||
|
||||
|
||||
@main.route("/", methods=["GET"])
|
||||
def index():
|
||||
# my_task.delay("Hello World!")
|
||||
schedule_name = str(uuid4())
|
||||
dt = datetime.utcnow()
|
||||
interval = rrule(freq="MINUTELY", dtstart=dt)
|
||||
entry = RedBeatSchedulerEntry(
|
||||
schedule_name,
|
||||
"project.tasks.my_task",
|
||||
interval,
|
||||
args=["From the scheduler"],
|
||||
kwargs={"schedule_name": schedule_name},
|
||||
app=celery_app,
|
||||
)
|
||||
entry.save()
|
||||
return "Created the schedule!"
|
||||
Reference in New Issue
Block a user