Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq-python: adding support for Transaction #268

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

tavallaie
Copy link
Contributor

due to #257, I try to add transaction to python library.

@tavallaie
Copy link
Contributor Author

tavallaie commented Jun 15, 2024

I don't know why my tests will fails, logger don't show anything particular. @v0idpwn @ChuckHend can you help me to solve it?

@v0idpwn
Copy link
Collaborator

v0idpwn commented Jun 15, 2024

I can take a look only tomorrow.

Copy link
Collaborator

@v0idpwn v0idpwn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting the work on this. I have a few questions/concerns, some of them may be wrong due to me not knowing too much about Python and it's ecosystem, so please take with a grain of salt :)

(1) Could you please add a test/example where the transaction is used by queries outside of pgmq? I don't exactly follow how this would be done. Assume that, for example, I wanted to: open a transaction, read 10 messages, insert 10 entries in a misc table, delete the 10 messages I've read, commit the transaction.

(2) Is this completely thread-safe? can I use it with async python and have 2 transactions open at the same time? For example: start T1, read 10 messages, start some async work, start T2, read 10 messages, start some async work, finish T2, finish T1?

Comment on lines 47 to 53
with self.pool.connection() as conn:
try:
logger.debug(f"Transaction started with conn: {conn}")
with conn.transaction():
result = func(self, *args, conn=conn, **kwargs)
logger.debug(f"Transaction completed with conn: {conn}")
return result
Copy link
Collaborator

@v0idpwn v0idpwn Jun 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully grasp this code, too. What makes it not start (and finish) a transaction every single call? Is this "transaction completed" log wrong? The transaction shouldn't finish upon the statement running, no?

Comment on lines 195 to 204
def test_transaction_create_queue(self):
"""Test creating a queue within a transaction."""
try:
self.queue.create_queue("test_queue_txn", perform_transaction=True)
raise Exception("Intentional failure")
except Exception:
pass
finally:
queues = self.queue.list_queues(perform_transaction=False)
self.assertNotIn("test_queue_txn", queues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tavallaie , I think during the transactional flow, we basically want users to control when transactions are opened and closed. In these test cases then I think I would expect there to be like a .begin() and a .commit() that are called before/after (out side of pgmq) the pgmq calls.

I'll need to look up the exact syntax in psycopg3, but in pseudo code, something like this:

tx = connection.begin()
queue.create('somequeue', tx)
tx.execute('insert into some other table")
queue.send('somequeue', 'message', tx)
tx.commit()

I don't see a commit anywhere, so that might be why these tests are failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I find my bug I didn't have any .commit() in my code.

@tavallaie
Copy link
Contributor Author

tavallaie commented Jun 16, 2024

(2) Is this completely thread-safe? can I use it with async python and have 2 transactions open at the same time? For example: start T1, read 10 messages, start some async work, start T2, read 10 messages, start some async work, finish T2, finish T1?

psycopg connection pool is thread safe but if you you want to support async operations we need use asyncpg instead of psycopg_pool through pip and some modification to code.

@tavallaie
Copy link
Contributor Author

(1) Could you please add a test/example where the transaction is used by queries outside of pgmq? I don't exactly follow how this would be done. Assume that, for example, I wanted to: open a transaction, read 10 messages, insert 10 entries in a misc table, delete the 10 messages I've read, commit the transaction.

I think I fixed the bug, if it is still necessary I will add it.

@tavallaie tavallaie marked this pull request as ready for review June 16, 2024 16:06
@ChuckHend
Copy link
Contributor

@tavallaie , can you put forth an example (or better yet, a test) using this implementation with an existing transaction? This is roughly how I would imagine someone might want to use it:

with psycopg.connect() as conn:
   query_results = conn.execute("select users from notifications")
   queue.tx = conn: # or something like this
   for user in query_results:
     queue.send('myq', user)
   conn.execute("delete from notifications where ....")

# a select, multiple pgmq.send(), and a delete all executed in single transaction

Maybe this is possible with what is currently in this PR? I am not sure though, so I think a test like this would be great. I can help with that if needed.

@tavallaie tavallaie marked this pull request as draft June 16, 2024 22:08
@tavallaie
Copy link
Contributor Author

Maybe this is possible with what is currently in this PR? I am not sure though, so I think a test like this would be great. I can help with that if needed.

I define a decorator to do everything within single transaction but I have to write test for that decorator, I am working on it.

@ChuckHend ChuckHend changed the title adding support for Transaction pgmq-python: adding support for Transaction Aug 13, 2024
@ChuckHend
Copy link
Contributor

@tavallaie, are you still working on this? If not, maybe we can find someone to finish it or add transaction support in with #290

@tavallaie
Copy link
Contributor Author

let me give it another shot, but I will appreciate any help I can get.

@tavallaie
Copy link
Contributor Author

let me give it another shot. but I will appreciate any help I can get.

@tavallaie
Copy link
Contributor Author

tavallaie commented Sep 15, 2024

I also disagree with #290, in this way we have problem using it in Django, celery or other ORM like peewee and etc.
it is better too have different library for that.

- adding support for logging in sync and async functions
- adding support for transaction in sync and asyns operations
- adding uint tests
- separated module for transaction decorators to avoid conflicts in unit tests
- updating readme
@tavallaie tavallaie marked this pull request as ready for review September 15, 2024 18:45
@tavallaie
Copy link
Contributor Author

@ChuckHend I think it is ready for review

@tavallaie
Copy link
Contributor Author

I also disagree with #290, in this way we have problem using it in Django, celery or other ORM like peewee and etc. it is better too have different library for that.

adding some ORM natively means adding unnecessary dependency for other projects that not using those dependency at all.
so for example If I want to make a worker without need orm or using django that already have it owns database backend we have extra dependency.
so I suggest making another library or extra package for that like async.

@ChuckHend
Copy link
Contributor

@tavallaie can you add an example that demonstrates something like the SQL below, but using the python implementation in this PR?

begin;
select pgmq.create('somequeue');
insert into some_table (name) values ('brian');
select pgmq.send('somequeue', '{"hello": "world"}');
commit;

@tavallaie
Copy link
Contributor Author

I'll add it now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants