mardi 26 octobre 2010

Un Iterateur pour colonnes

Afin d'illustrer le développement que l'on peut faire sous Cassandra je vous propose de découvrir le code d'un itérateur pour Cassandra. Ce genre d'itérateurs est particulièrement utile pour des Column Family triées par TimeUUID avec un grand nombre de colonnes pour chaque clef.

Le constructeur prend 3 arguments :
La ColumnFamily
La clef qui vous intéresse
Le nombre de colonnes à récupérer maximum par itération.

Quelques explications avant le code :

Ce boût de code fonctionne avec Cassandra 0.6.X, sûrement avec Cassandra 0.7 (je n'ai pas encore testé) et utilise hector.
La méthode get(long start, long end) permet de renvoyer les colonnes entre "start" et "end".
La méthode next() renvoit au maximum le nombre de colonnes à renvoyer. La connexion à la base de données se fait uniquement dans cette méthode, comme cela la connexion reste ouverte très peu de temps.
Les constructeurs permettent de définir la columnFamily la key ainsi que le nombre de colonnes à récupérer à chaque itération. Ils sont appelés par l'intermédiaire d'une fonction statique getColIterator( ).


Attention les classes ne fonctionneront pas de suite si vous faites un copier/coller ( il faudra faire quelques modifications notamment dans le nom des packages) et le code n'est pas optimal, je le changerai dans les jours à venir mais c'est fonctionnel (Il y a un bug mais il est possible que cela vienne de Cassandra)


Classe ColumnIterator

package monpackage;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import monpackage.Connector;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.thrift.TException;

/**
 *
 * @author victork
 */
public class ColumnIterator {

    private String columnFamily;
    private String key;
    private int nb_colToRetrieve;

    private long index;

    private byte[] startCol;
    private byte[] endCol;

    /**
     * Constructor
     * @param CF Column Family
     * @param rel_key
     * @param maxCol number of column that is going to be retrieved at each iteration.
     */

    private ColumnIterator(String CF, String rel_key,int maxCol){
        columnFamily=CF;
        key=rel_key;
        nb_colToRetrieve=maxCol;
        index=0l;
        startCol=new byte[]{};
        endCol=new byte[]{};
    }

    /**
     *
     * @param CF
     * @param rel_key
     * @return
     */

    public static ColumnIterator getColIterator(String CF, String rel_key){
        return new ColumnIterator(CF, rel_key,500);
    }

    /**
     *
     * @param CF
     * @param rel_key
     * @param maxCol
     * @return
     */

    public static ColumnIterator getColIterator(String CF, String rel_key, int maxCol){
        return new ColumnIterator(CF, rel_key,maxCol);
    }

    /**
     *
     * @return
     */

    public List next(){
        Connector connector=new Connector();
        ArrayList listCol=new ArrayList();
        try {
            Keyspace ks = connector.getKeyspace();
            SlicePredicate sp = new SlicePredicate();
            SliceRange sliceR = new SliceRange();
            sliceR.setCount(nb_colToRetrieve);
            sliceR.setStart(startCol);
            sliceR.setFinish(endCol);
            sliceR.setReversed(true);
            sp.setSlice_range(sliceR);
            ColumnParent cp = new ColumnParent();
            cp.setColumn_family(columnFamily);
            listCol = (ArrayList) ks.getSlice(key, cp, sp);
            startCol = listCol.get(listCol.size() - 1).getName();
            index = index + listCol.size();
        } catch (IllegalArgumentException ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        } catch (NotFoundException ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        } catch (TException ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        } catch (IllegalStateException ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        } catch (PoolExhaustedException ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        } catch (Exception ex) {
            Logger.getLogger(ColumnIterator.class.getName()).log(Level.SEVERE, null, ex);
        }finally{
            Connector.releaseClient(connector);
        }
        return listCol;
    }

    /**
     *
     * @param start
     * @param end
     * @return
     */

    public List get(long start, long end) {
        ArrayList result = new ArrayList();
        while (index + nb_colToRetrieve <= start ) {
            if(next().isEmpty())
                break;
        }
        while (index < end) {
            ArrayList temp = (ArrayList) next();
            if (temp.isEmpty()) {
                break;
            } else {
                int begin_index = 0;
                int end_index = temp.size();
                int reduced_index= (int) (index - temp.size());
                begin_index = Math.max((int) (start - reduced_index),0);
                end_index = Math.min( (int) ( end -reduced_index ),temp.size());
                result.addAll(temp.subList(begin_index, end_index));
            }
        }
        return result;
    }

}


Class Connector :


package monpackage;

import java.util.logging.Level;
import java.util.logging.Logger;
import me.prettyprint.cassandra.service.CassandraClient;
import me.prettyprint.cassandra.service.CassandraClientPool;
import me.prettyprint.cassandra.service.CassandraClientPoolFactory;
import me.prettyprint.cassandra.service.Keyspace;
import me.prettyprint.cassandra.service.PoolExhaustedException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.thrift.TException;

/**
 *
 * @author victork
 */
public class Connector {

    private static final int CASSANDRA_PORT = 9160;
    private static final String CASSANDRA_KEYSPACE = "MonKeyspace";
    private static final String CASSANDRA_HOST = "localhost";
    private CassandraClientPool pool;
    private CassandraClient client;

    /**
     *
     * @return a client borrowed. Do not forget to release it afterwards
     * @throws IllegalStateException
     * @throws PoolExhaustedException
     * @throws Exception
     */

    private CassandraClient getClient() throws IllegalStateException,
            PoolExhaustedException, Exception{
        pool= CassandraClientPoolFactory.getInstance().get();
        return pool.borrowClient(CASSANDRA_HOST, CASSANDRA_PORT);
    }

    /**
     *
     * @param client
     * @return a keyspace
     * @throws IllegalArgumentException
     * @throws NotFoundException
     * @throws TException
     */

    public Keyspace getKeyspace() throws IllegalArgumentException,
            NotFoundException, TException, IllegalStateException, PoolExhaustedException, Exception{
        client=getClient();
        return client.getKeyspace(CASSANDRA_KEYSPACE);
    }

    /**
     * Release the client borrowed
     * @param connector
     * @throws Exception
     */

    public static void releaseClient(Connector connector) {

        if (connector != null) {
            try {
                connector.pool.releaseClient(connector.client);
            } catch (Exception ex) {
                Logger.getLogger(Connector.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

}

N'hésitez pas à me poser des questions.
A bientôt.

Aucun commentaire:

Enregistrer un commentaire