Double Spending Issues

Hi, all
Earlier this week, our group were trying to test concurrency, and we find that if we use 500 threads to add certain amount to an account in the database, it appears that the total amount in that account after all will be higher than the actual total amount. This issue does not appear if we use 100 threads; in case we did something wrong with our program, we used the simple example provided in developer guide to process the test. I truly understand that since CockroachDB uses Atomic transactions, and Raft, such issues would not likely to happen, so is it possible that there is extra settings we should do to avoid this?
Any contribution would be highly appreciated!
Best Regards

@Lionxx The number of concurrent threads should absolutely not affect correctness. Can you provide reproduction instructions (i.e. link to test program you are using and your cluster configuration)? If this is easily reproducible it should be straightforward to track down what is going on. Also, be sure to include which version of CockroachDB you are using.

Hi, Peter
Thanks for catching up, our group mate just told me they have reached you before, but it could be the case that there were some misunderstandings so that this problem is still not solved, do apologize for this. We thought it could be the case of load balancer of Azure, so we fixed it and now the load balancer is deployed, but the problem still exists. I think you got our copies of code and let me know if you do not have, I will post it again.
Best Wishes

@Lionxx Apologies for asking a simple question: who is your group mate? The forum isn’t giving me any context here. Links to old posts or copies of code and reproduction instructions would be appreciated.

@peter
They reached you via gitter, the name was xiaohan.liang. Thank you so much for your effort!

The search in Gitter leaves many things to be desired. I’m able to find messages from XiaohanLiang, but can’t find any code. I think it would be a lot easier for both of us if we just started the conversation fresh here.

@peter Thanks, here is the code for establishing connection:

var pg = require('pg');
var fs = require('fs');
config = {
    user: 'maxroach',
    host: 'localhost',
    database: 'bank',
    port: 26257
  max: 10,
	ssl: {
		rejectUnauthorized: false,
		ca  : fs.readFileSync("./certs/ca.crt").toString(),
		key : fs.readFileSync("./certs/client.root.key").toString(),
		cert: fs.readFileSync("./certs/client.root.crt").toString(),
	}

};

const pool = new pg.Pool(config);

module.exports = {
  query: (text, params, callback) => {
    const start = Date.now()
    return pool.query(text, params, (err, res) => {
      const duration = Date.now() - start
      console.log('executed query', { text, duration, rows: res.rowCount })
      callback(err, res)
    })
  },
  getClient: (callback) => {
    pool.connect((err, client, done) => {
      var finish = function () {
        done();
        process.exit();
      };

      if (err) {
        console.error('could not connect to cockroachdb', err);
        finish();
      }
      callback(null, client, done)
    })
  }
}

Then, the code below is represented a single transaction:

var async = require('async');
var express = require('express');
var router = express.Router();
var pool = require('../models/connectDB');
router.get('/', function(req, res) {

  var from = req.query.From;
  var to = req.query.To;
  var amount = req.query.Amount;
  
  // Wrapper for a transaction.
  // This automatically re-calls "op" with the client as an argument as
  // long as the database server asks for the transaction to be retried.

  function txnWrapper(client, op, next) {
    client.query('BEGIN; SAVEPOINT cockroach_restart', function (err) {
      if (err) {
        return next(err);
      }

      var released = false;
      async.doWhilst(function (done) {
        var handleError = function (err) {
          // If we got an error, see if it's a retryable one and, if so, restart.
          if (err.code === '40001') {
            // Signal the database that we'll retry.
            return client.query('ROLLBACK TO SAVEPOINT cockroach_restart', done);
          }
          // A non-retryable error; break out of the doWhilst with an error.
          return done(err);
        };

        // Attempt the work.

        op(client, function (err) {
          if (err) {
            return handleError(err);
          }
          var opResults = arguments;

          // If we reach this point, release and commit.
          client.query('RELEASE SAVEPOINT cockroach_restart', function (err) {
            if (err) {
              return handleError(err);
            }

            released = true;
            return done.apply(null, opResults);
          });
        });
      },

      function () {
        return !released;
      },

      function (err) {
        if (err) {
          client.query('ROLLBACK', function () {
            next(err);
          });
        } else {
          var txnResults = arguments;
          client.query('COMMIT', function(err) {
            if (err) {
              return next(err);
            } else {
              return next.apply(null, txnResults);
            }
          });
        }
      });
    });
  }

 

  // The transaction we want to run.
  function transferFunds(client, from, to, amount, next) {
    // Check the current balance.
    client.query('SELECT balance FROM users WHERE id = $1', [from], function (err, results) {
      if (err) {
        return next(err);
      } else if (results.rows.length === 0) {
        return next(new Error('account not found in table'));
      }

 

      var acctBal = results.rows[0].balance;
      if ((acctBal - amount) >= 0) {
        // Perform the transfer.
        async.waterfall([
          function (next) {
            // Subtract amount from account 1.
            client.query('UPDATE users SET balance = balance - $1 WHERE id = $2', [amount, from], next);
          },
          function (updateResult, next) {
            // Add amount to account 2.
            client.query('UPDATE users SET balance = balance + $1 WHERE id = $2', [amount, to], next);
          }, function (updateResult, next) {
            // Fetch account balances after updates.
            client.query('SELECT id, balance FROM users', function (err, selectResult) {
              next(err, selectResult ? selectResult.rows : null);
            });
          }
        ], next);
      } else {
        next(new Error('insufficient funds'));
      }

    });

  }

 

 

  pool.getClient(function (err, client, done) {
    // Execute the transaction.
    txnWrapper(client,
    function (client, next) {
      transferFunds(client, from, to, amount, next);
    },

    function (err, results) {
      if (err) {
        console.error('error performing transaction', err);
        finish();
      }

      console.log('Balances after transfer:');
      results.forEach(function (result) {
        console.log(result);
      });
      done();
      res.json({success: "transaction is finished"});

      //finish();

    });

  });

})

 

module.exports = router;

The version we used is a bit old, 1.0.6 I am afraid.
Thank you so much for the code review.
Best Wishes

Thanks! I’m not going to have a chance to look at this right now, but will take a look later tonight or tomorrow.

@peter
Hi, Peter
In case it is the issue of our Azure deployment, or our unskilled node.js code, we have written a piece of python code and tested with Cockroachdb on our local machine which may help you address this issue more easily. Unfortunately, double spending still exists, account 1 has balance 9870 instead of correct amount 9400, and account 2 has balance 10270 instead of 10600. As long as you have a table called ‘accounts’ in database ‘bank’ with user ‘maxroach’, this piece of code should work fine. The version is 1.1.5 this time.

# Import the driver.
import psycopg2
import psycopg2.errorcodes
import threading
# Connect to the cluster.
conn = psycopg2.connect(database='bank', user='maxroach', host='localhost', port=26257)
thread_amount=50
thread_pool = []
def onestmt(conn, sql):
    with conn.cursor() as cur:
        cur.execute(sql)


# Wrapper for a transaction.
# This automatically re-calls "op" with the open transaction as an argument
# as long as the database server asks for the transaction to be retried.
def run_transaction(conn, op):
    with conn:
        onestmt(conn, "SAVEPOINT cockroach_restart")
        while True:
            try:
                # Attempt the work.
                op(conn)

                # If we reach this point, commit.
                onestmt(conn, "RELEASE SAVEPOINT cockroach_restart")
                break

            except psycopg2.OperationalError as e:
                if e.pgcode != psycopg2.errorcodes.SERIALIZATION_FAILURE:
                    # A non-retryable error; report this up the call stack.
                    raise e
                # Signal the database that we'll retry.
                onestmt(conn, "ROLLBACK TO SAVEPOINT cockroach_restart")


# The transaction we want to run.
def transfer_funds(txn, frm, to, amount):
    with txn.cursor() as cur:

        # Check the current balance.
        cur.execute("SELECT balance FROM accounts WHERE id = " + str(frm))
        from_balance = cur.fetchone()[0]
        if from_balance < amount:
            raise "Insufficient funds"

        # Perform the transfer.
        cur.execute("UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, frm))
        cur.execute("UPDATE accounts SET balance = balance + %s WHERE id = %s",
                    (amount, to))


# Execute the transaction.
run_transaction(conn, lambda conn: transfer_funds(conn, 1, 2, 100))

for thread_index in range(thread_amount):
    t = threading.Thread(target=run_transaction, args=(conn, lambda conn: transfer_funds(conn, 1, 2, 10)))
    thread_pool.append(t)

for thread_index in range(thread_amount):
    thread_pool[thread_index].start()

for thread_index in range(thread_amount):
    thread_pool[thread_index].join()

with conn:
    with conn.cursor() as cur:
        # Check account balances.
        cur.execute("SELECT id, balance FROM accounts")
        rows = cur.fetchall()
        print('Balances after transfer:')
        for row in rows:
            print([str(cell) for cell in row])

# Close communication with the database.
conn.close()

Thank you so much for this!
Best Wishes

626061470471741069|424x108

My familiarity with Node.js is limited and I’ve never used Python for database access. Another engineer took a look at the Python code and noticed that you’re using threading but no locking. The psycopg2 docs seem to indicate that psycopg2.connect returns a single connection which all of the threads are sharing. The cursors returned from conn.cursor() are not isolated from other cursors created on the same connection (per http://initd.org/psycopg/docs/cursor.html). So our suspicion is that your threads are interleaving commands from different transactions on the same connection causing all sorts of badness. In fact, you managed to uncover a server panic in 2.0 when we ran your python code.

My knowledge of threading and python is limited, but perhaps you can allocate a connection per thread. Simply placing a lock around run_transaction won’t help as that will remove all of the concurrency.

Hi, Peter
Thank you so much for your reply, hope I did not misunderstand. What you mean is that our code does not use threads correctly, right? If so, would you mind to provide an alternative solution to this? Cause we have to consider concurrency in our project, this looks like we cannot avoid this. The code we use is derived from the example code on Cockroachdb Website(node.js and python), hence they might all suffer this kind of issue. At last, I know it is not polite at all, but could you give us the contact detail of the engineer who has solid experience with node.js if you do not mind, cause we really struggled with this for such a long time.
Looking forward to your reply!
Enjoy the rest of your day!

By the way, did you mention that the problem can be solved if we use v2.0? thanks.

@Lionxx The problem is in your client code: it appears to not be using threads correctly. Using v2.0 will not solve the problem (I apologize for any confusion caused by my mention of v2.0). I’m not familiar enough with either Node.js or Python to provide you code in those languages. You definitely do want to be utilizing threading or some other form of concurrency, otherwise your performance will be significant lower. I’m going to file a documentation issue suggesting that we add more sophisticated examples that include threading.

@Lionxx, were you experiencing the double spend issue with the Node.js code you first posted? Node is single threaded, so I’m a but confused as to what you mean by running it with 500 threads. Did you have a test process opening 500 concurrent connections to your express server?

One thing that seems potentially problematic out about your code is this block in the route handler:

if (err) {
  console.error('error performing transaction', err);
  finish();
}

In particular, the finish() function isn’t within scope of that function (it looks like someone already realized that and commented it out further down in the function). So, if you encounter a non-retryable error coming from Cockroach, you’ll be calling the bad finish() function, which would normally throw a ReferenceError and terminate the process, however, express will be using its Default Error Handler, which will cause that bug to manifest as a 500 response, rather than a server crash.

Did you experience any status code 500 responses in your testing? If so, my suspicion is that as soon as that ReferenceError is encountered, your express route function is immediately exiting, and you’re never calling done() on your checked out client, releasing it back into the Pool. I’m not sure what the behavior of pg.Pool is in this case, but it’s not hard to imagine failing to cleanly release a client leading to data consistency errors.

If you encountered any errors while doing your testing, try changing the finish() call in that error handling block to done() and seeing if you are able to reproduce.

Hi, Peter
Thank you so much, this is far more kind, please give me a shout if you update these files.
Best Wishes

@Lionxx Here is the docs issue I filed about extending the examples: https://github.com/cockroachdb/docs/issues/2855. I’d love to say I’ll remember to ping you if and when the examples are extended, but you’re better off “watching” the issue.

Just merged that with an existing issue, which we hope to address soon. Please follow this one: https://github.com/cockroachdb/docs/issues/2464.

@twrobel
Hi, mate
Thank you so much for help, you are definitely correct with finish() function, we have corrected it. To be honest, we are students and we have limited experience with node.js, and what we were doing was to create several threads from our Python client side, and send requests to server side(node.js) which is basically the code we sent to deal with these requests. Until now, we still a bit confused why we experience such issue, is there anything wrong with our python or node.js code? or the deployment of azure is incorrect?

To eliminate the uncertainty, we have performed another test last night with only Python code and local CockroachDB, and Peter told me there was something wrong with our Python code, but until now we have not yet figured out how to correct it. Meanwhile, I was thinking that what happen if there is a malicious attack using the incorrect concurrent requests like what we used, to attack the database in order to gain more balance to his account? Why cockroachdb with Atomic specialty still suffers such issue? And more importantly, how could we correct our code?

Sorry for complaining, but really appreciate for your help, your time and your patience!

Best Wishes

Thank you so much for you two, @jesse @peter, your whole team are super helpful and CockroachDB deserves to gain more success in the future! Also, really looking forward to hearing from your update in that thread!

Cheers

@Lionxx Your Python code example was not an example of a malicious attack against the database, but of a bug in application code. CockroachDB provides some features to help alleviate bugs, but no database can prevent all application bugs. In effect, what your Python code was doing was inconsistently updating the database state by not using transactions correctly. To put this another way, if you ran that Python code against Postgres you would likely see exactly the same behavior. If you adapted the code to run against Oracle or MySQL or SQL Server you would see the same behavior. The database is doing what it is told.

How do you fix your code? Each thread needs to be creating its own transaction and each transaction needs to be executed on a separate session (i.e. a connection). I’m afraid I don’t have the Node.js or Python expertise to provide you code which does this. We’re going to investigate improving our examples to include concurrency and/or threading out of the box, but I can’t give you any time frame for when that will happen.