Coder Social home page Coder Social logo

Comments (8)

wbolster avatar wbolster commented on August 13, 2024

Are you using the same Batch instance from multiple threads? Batch instances are not thread safe, and neither is the Connection instance, so this can lead to strange results or crashes.

I'd recommend either

  1. a multiple producer/single consumer approach (the consumer is the only one calling into HappyBase); or
  2. a ConnectionPool with one connection per thread (available in recent releases).

Which approach is faster depends on a lot of things, so I'd suggest trying it out.

from happybase.

saintthor avatar saintthor commented on August 13, 2024

i use independent connection and batch instance for each thread. i don't understand how to use ConnectionPool. in my case, if one operation causes error, the follow operation would not work until closing the connection and connect again.

i tried to use 2 batches for the two versions of same line for one table in one thread. it runs well. i guess in batches, if there are two put operations for one key, the first will be ignored. is it?

a single consumer is not fast enough. i am storing a very large data.

from happybase.

wbolster avatar wbolster commented on August 13, 2024

Setting multiple column values for the same row by calling batch.put() multiple times is supposed to work correctly. (The Batch class accumulates all changes grouped by row.) If that does not work correctly, it may indicate a bug in either your application or HappyBase.

Can you provide code that it's a bit more complete? The sample code in your original report did not make clear you are using a separate connection and batch instance per thread. Additionally, can you provide (part of) the traceback for the errors you are seeing?

Re. using the connection pool: the user guide contains an explanation and sample code.

from happybase.

saintthor avatar saintthor commented on August 13, 2024

here is the simplified code:

class MessageParser( threading.Thread ):
    def __init__( self, hbIP, hbVersion, recvQ ):
        threading.Thread.__init__( self )
        self.setDaemon( True )
        self.IP_Ver = hbIP, hbVersion  #hbase version is 0.90
        self.Conn = happybase.Connection( hbIP, compat=hbVersion )
        self.RecvQ = recvQ
        self.MsgBatch = self.Conn.table( 'hky_Message' ).batch()
        self.SbmtTime = time()                          #上次入库时间
        self.SbmtNum = 0                                #批次内消息计数

    def run( self ):
        while True:
            try:
                for i in xrange( 200 ):
                    MsgStr = self.RecvQ.get_nowait()
                    message = json.loads( MsgStr )
                    self._Parse( message )
            except Queue.Empty:
                sleep( 0.05 )

            if self.SbmtNum >= 200 or time() - self.SbmtTime > 1:
                self._Submit()

    def _Parse( self, Message ):
        self.SbmtNum += 1
        if getVirson( Message ) == 0: #ver 0
            pass
        else:                                     #ver 1
            pass
        self.MsgBatch.put( "%(key)s" % Message, {...} ) #2 versions with same key

    def _Submit( self ):
        self.MsgBatch.send()
        self.SbmtTime = time()
        self.SbmtNum = 0

the codes above may lost versions when a mass of data. there is only 1 version of a line in hbase while it should be 2. the under turns well.

in init, add:

        self.MsgBatch2 = self.Conn.table( 'hky_Message' ).batch()

    def _Parse( self, Message ):
        self.SbmtNum += 1
        if getVirson( Message ) == 0: #ver 0
            self.MsgBatch.put( "%(key)s" % Message, {...} )
        else:                                     #ver 1
            self.MsgBatch2.put( "%(key)s" % Message, {...} )  #use deferent batch for deferent version

in _Submit, add:

        self.MsgBatch2.send()

from happybase.

wbolster avatar wbolster commented on August 13, 2024

Okay, I think I see where the problem is. It seems to me this is not a HappyBase problem, but an incorrect assumption in your application about how HBase and HappyBase work.

You cannot use the batch mutation API to store multiple versions for the same row, since all mutations sent as a single batch will end up having the same timestamp. That is by design, and enables some form of "transactional updates" for multiple rows.

What your application seems to do is this:

  • Create a Batch
  • Write some values for row row1, e.g. {'cf:col1': 'v1', 'cf:col2': 'v2'}
  • Again write values for the same columns on the same row row1, e.g. {'cf:col1': 'v1', 'cf:col2': 'v2'}
  • This will overwrite the previous values in the batch, so the first values will be lost.

If instead you were writing different columns for the same row in the same batch, that would work correctly, e.g. in the following example the row row1 would end up having two columns.

batch.put('row1', {'cf1:col1': 'v1'})
batch.put('row1', {'cf1:col2': 'v2'})  # note: different column!
batch.send()

What exactly are you trying to achieve? If you want to store multiple versions (timestamps), you should either:

  • send those versions in a separate batch, and let HBase use the current timestamp
  • specify an explicit timestamp using table.batch(timestamp=...), and make sure you don't include any duplicate columns (because the columns written before in the same batch will be lost)
  • find another way to handle your versions, e.g. by putting timestamps in your keys instead of relying on HBase's versioning/timestamp handling.

from happybase.

wbolster avatar wbolster commented on August 13, 2024

Btw, it seems to me your application logic could be a lot simpler. Untested code example:

class MyThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__( self )
        self.connection = happybase.Connection(...)
        self.queue = queue

    def run(self):
        with self.connection.table('...').batch(batch_size=1000) as b:
            for msg in iter(queue, None):
                key, data = self._parse_message(msg)
                batch.put(key, data)

The above would be the 'consumer' part of your system. The generator part should put a sentinel None value in the queue for each consumer thread, e.g. for i in xrange(N_THREADS): queue.put(None).

from happybase.

saintthor avatar saintthor commented on August 13, 2024

thank you. using deferent column branch instead of versions is a good idea.

in the real code, i have 2 batches for 2 tables, so i can not write it in one with block

btw, how many puts before every sending for a batch do you think is most efficient?

from happybase.

wbolster avatar wbolster commented on August 13, 2024

You can use 'with a, b: ...' just fine to handle multiple batches in one 'with' block.

— Wouter

(Sent from my phone. Please ignore the typos.)

saintthor [email protected] schreef:

thank you. using deferent column branch instead of versions is a good
idea.

in the real code, i have 2 batches for 2 tables, so i can not write it
in one with block.


Reply to this email directly or view it on GitHub:
#33 (comment)

from happybase.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.