GridGain Community Edition

Expand all | Collapse all

Ignite cache store using HBase

  • 1.  Ignite cache store using HBase

     
    Posted 07-12-2019 09:42 AM
    Hi! I've implemented Ignite cache store using HBase as the back-end persistent store. The code for the Cache Store is as follows:
    public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {
    
    @IgniteInstanceResource
    Ignite gridReference;
    
    @CacheNameResource
    private String cacheName;
    
    @Override
    public byte[] load(Long key) {
    
        String hbaseKey;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
                Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
    
                Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);
    
                if (rowFetched == null || rowFetched.isEmpty()) {
                    return null; // Can't return an empty array as Ignite will
                                    // load the entry
                }
    
                return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                    "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {
    
        String hbaseKey;
    
        long startTime = System.currentTimeMillis();
    
        long numberOfKeysLoaded = 0l;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);
    
                Get rowToBeFetched;
    
                Result rowFetched;
    
                for (Long key : keys) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
                    rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
    
                    rowFetched = bitDataPersistentTable.get(rowToBeFetched);
    
                    cacheToBeLoaded.put(key,
                            rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                                    TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));
    
                    numberOfKeysLoaded++;
    
                }
    
                System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
                        + " ] took [ " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds ] ");
    
                return null;
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                    "Error while reading multiple keys for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void write(Entry<? extends Long, ? extends byte[]> entry) {
    
        String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));
    
                rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());
    
                bitDataPersistentTable.put(rowToBeWritten);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                    "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {
    
        long startTime = System.currentTimeMillis();
    
        String hbaseKey;
    
        List<Put> rowsToBeWritten = new ArrayList<>();
    
        Put currentRowToBeWritten;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                            entryToBeInserted.getKey().toString());
    
                    currentRowToBeWritten = new Put(hbaseKey.getBytes());
    
                    currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                            TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
                            entryToBeInserted.getValue());
    
                    rowsToBeWritten.add(currentRowToBeWritten);
    
                }
    
                bitDataPersistentTable.put(rowsToBeWritten);
    
            }
    
            System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
                    + " ] is " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds");
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                    "Error while writing multiple keys for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void delete(Object key) {
    
        String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));
    
                bitDataPersistentTable.delete(rowToBeDeleted);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                    "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public void deleteAll(Collection<?> keys) {
    
        String hbaseKey;
    
        List<Delete> rowsToBeDeleted = new ArrayList<>();
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                for (Object keyToBeDeleted : keys) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                            keyToBeDeleted.toString());
    
                    rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));
    
                }
    
                bitDataPersistentTable.delete(rowsToBeDeleted);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                    "Error while deleting entries for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
        // No implementation provided
    }
    
    @Override
    public void sessionEnd(boolean commit) {
        // No implementation provided
    }
    
    }
    

    The cache mode is PARTITIONED.

    The cache atomicity mode is ATOMIC.

    It is evident from the store implementation that I am spawning a new connection to HBase in each and every one of the implemented methods.

    I wanted to know if there is any method or way to have more control over opening and closing my data source specific resources (in this case, HBase connections) at a more macro level instead of performing it at every method invocation.
    Thanks!



  • 2.  RE: Ignite cache store using HBase

     
    Posted 07-13-2019 09:14 AM
    Hi Shep, I think you need to use connection pool in your store. Check out c3p0.

    ------------------------------
    Linda Berg
    Developer
    Apple
    ------------------------------