Daily Post Image from Upsplash

April: 10


Double check the shell execution.



Today is going to be a heavy day of sneeks. The big phat python that hopefully will not wrap around my server or neck.

Earlier, I went ahead and updated a decent amount of the atlas code base, preparing it for the future production environment, including test casing it on docker.

Yet, sadly, even the weird edge cases become a huge pain! But its been long over due, at least since last year, so it makes sense that I invest some heavy time into it.


To get started, we will add both the fastapi and the uvicorn[standard] to our standard poetry book, as they will provide the main entrance to our software stack. Majority of the codebase will be within the kbve_atlas, with a main.py file located outside of it. This main.py file will be the router that we use to handle all the python routes, including any micro-service that we might need down the line. Under the current build, I am thinking of keeping a bunch of the services within an isolated python instance and then using our Rust API to handle the communication between them, ideally using a queue style system.


Now the adventure of integrating the main.py with the FastAPI.

Here are the core functions we are going to migrate!

The first is the google/rss news feed:

async def google_main():
    #rss_url = "https://news.google.com/rss?hl=en-US&gl=US&ceid=US:en"
    rss_url = "https://feeds.bbci.co.uk/news/world/rss.xml"
    rss_utility = RSSUtility(base_url=rss_url)


        soup = await rss_utility.fetch_and_parse_rss()
        rss_feed_model = await rss_utility.convert_to_model(soup)
        formatted_feed = RSSUtility.format_rss_feed(rss_feed_model)
        print("Done RSS")


While doing that change, I realized we could decor the poetry_main and coindesk_main.

async def poetry_main():
    poetry_client = PoetryDBClient()
        random_poem = await poetry_client.get_random_poem()
        print(f"Title: {random_poem.title}\nAuthor: {random_poem.author}\n")
        await poetry_client.close()
        print("\n Closed Session")
    except Exception as e:
        print(f"An error occurred: {e}")


async def coindesk_main():
    coindesk_client = CoinDeskClient()
        bitcoin_price = await coindesk_client.get_current_bitcoin_price()
        await coindesk_client.close()


Added a decorant / base wrapper around the get command to make the movement of services faster and easier.

The decorants can be extended out, which will be within the util folder of atlas, but for now we should have more than enough to execute what we would need.

Next, we can add the broadcasting for ATLAS, holding our chat and the a couple of the basic or general machine learning commands.

To add the broadcasting, we will first add the library that was recommended by the FastAPI Team. Shell Command: ./kbve.sh -nx atlas:add --name=broadcaster[redis]

One of the things that I just realized is that we will need an internal Redis server, ideally one that can be accessed by Rust, Python, n8n, ect…

Hmm, creating a redis cluster within the swarm does make the most sense, with a redis entity on each node within the whole swarm?

We could also try to setup a redis bridge that would let some of the microservices sub/pub data from? This might just be all food for thought, I still need to do some more test casing.

Going to take a bit of a break from FastAPI and prepare to setup the broadcaster, we could also setup the relay through it as well! Hmmm…


The Nx flag on the ./kbve.sh aka -nx seems to be only taking one arguement and none of the additional flags.

This is going to be a quick fix around it, just quickly update the shell file so that it can take multiple additional flags and keeping the older part of the shell code within the file.

After a week or so, I will go back and update the code.


The next goal for the day will be to setup a Pub/Sub for our C A R P stack! Oh the joys!

To start the broadcaster, we can use this code block:

async def startup_event():
    await broadcast.connect()

async def shutdown_event():
    await broadcast.disconnect()

Yet the issue is that on_event of startup/shutdown has become deprecated and instead we would have to use @asynccontextmanager and the lifespan decorated.

So lets update the code to look something like this:

async def lifespan(app: FastAPI):
    await broadcast.connect()
    await broadcast.disconnect()

To launch the current ATLAS , we will be using uvicorn main:app --port 8086 however we might not have access to the redis right now on the mac, so lets swap that out to memory.

Here is the quick swap to memory:

broadcast = Broadcast("memory://")