Troubleshooting

Expand all | Collapse all

Ignite cache store using HBase

  • 1.  Ignite cache store using HBase

     
    Posted 07-12-2019 09:42 AM
    Hi! I've implemented Ignite cache store using HBase as the back-end persistent store. The code for the Cache Store is as follows:
    public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {
    
    @IgniteInstanceResource
    Ignite gridReference;
    
    @CacheNameResource
    private String cacheName;
    
    @Override
    public byte[] load(Long key) {
    
        String hbaseKey;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
                Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
    
                Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);
    
                if (rowFetched == null || rowFetched.isEmpty()) {
                    return null; // Can't return an empty array as Ignite will
                                    // load the entry
                }
    
                return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                    "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {
    
        String hbaseKey;
    
        long startTime = System.currentTimeMillis();
    
        long numberOfKeysLoaded = 0l;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);
    
                Get rowToBeFetched;
    
                Result rowFetched;
    
                for (Long key : keys) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
                    rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
    
                    rowFetched = bitDataPersistentTable.get(rowToBeFetched);
    
                    cacheToBeLoaded.put(key,
                            rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                                    TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));
    
                    numberOfKeysLoaded++;
    
                }
    
                System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
                        + " ] took [ " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds ] ");
    
                return null;
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
                    "Error while reading multiple keys for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void write(Entry<? extends Long, ? extends byte[]> entry) {
    
        String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));
    
                rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());
    
                bitDataPersistentTable.put(rowToBeWritten);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                    "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {
    
        long startTime = System.currentTimeMillis();
    
        String hbaseKey;
    
        List<Put> rowsToBeWritten = new ArrayList<>();
    
        Put currentRowToBeWritten;
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                            entryToBeInserted.getKey().toString());
    
                    currentRowToBeWritten = new Put(hbaseKey.getBytes());
    
                    currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
                            TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
                            entryToBeInserted.getValue());
    
                    rowsToBeWritten.add(currentRowToBeWritten);
    
                }
    
                bitDataPersistentTable.put(rowsToBeWritten);
    
            }
    
            System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
                    + " ] is " + ((System.currentTimeMillis() - startTime) / 1000.0) + " seconds");
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
                    "Error while writing multiple keys for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void delete(Object key) {
    
        String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));
    
                bitDataPersistentTable.delete(rowToBeDeleted);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                    "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
                            + " ] ");
        }
    
    }
    
    @Override
    public void deleteAll(Collection<?> keys) {
    
        String hbaseKey;
    
        List<Delete> rowsToBeDeleted = new ArrayList<>();
    
        try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
    
            try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
    
                for (Object keyToBeDeleted : keys) {
    
                    hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
                            keyToBeDeleted.toString());
    
                    rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));
    
                }
    
                bitDataPersistentTable.delete(rowsToBeDeleted);
    
            }
    
        } catch (IOException e) {
            throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
                    "Error while deleting entries for the cache [ " + cacheName + " ] ");
        }
    
    }
    
    @Override
    public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
        // No implementation provided
    }
    
    @Override
    public void sessionEnd(boolean commit) {
        // No implementation provided
    }
    
    }
    

    The cache mode is PARTITIONED.

    The cache atomicity mode is ATOMIC.

    It is evident from the store implementation that I am spawning a new connection to HBase in each and every one of the implemented methods.

    I wanted to know if there is any method or way to have more control over opening and closing my data source specific resources (in this case, HBase connections) at a more macro level instead of performing it at every method invocation.
    Thanks!



  • 2.  RE: Ignite cache store using HBase

     
    Posted 07-13-2019 09:14 AM
    Hi Shep, I think you need to use connection pool in your store. Check out c3p0.

    ------------------------------
    Linda Berg
    Developer
    Apple
    ------------------------------



Would you attend a July Meetup?


Announcements

  • Welcome to the new GridGain Forums!

    Hello and welcome to the new GridGain Forums. This is the place to ask questions, get or give advice and connect with your peers. GridGain experts regularly monitor these posts and can also help solve your issues.