Subscribe to recordV2 topics
The following example demonstrates how to process information from subscribed recordV2 topics, including the use of a schema.
This example demonstrates a Java™ client consuming recordV2 topics which contain currency conversion rates.
Each topic contains a record with two decimal fields, representing the buy and sell rates between a pair of currencies.
The example can be run either with or without a schema.
Java and Android
/******************************************************************************* * Copyright (C) 2017, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import static java.util.Objects.requireNonNull; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason; import com.pushtechnology.diffusion.client.features.Topics.ValueStream; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.datatype.recordv2.RecordV2; import com.pushtechnology.diffusion.datatype.recordv2.RecordV2Delta; import com.pushtechnology.diffusion.datatype.recordv2.RecordV2Delta.Change; import com.pushtechnology.diffusion.datatype.recordv2.model.RecordModel; import com.pushtechnology.diffusion.datatype.recordv2.schema.Schema; import com.pushtechnology.diffusion.datatype.recordv2.schema.SchemaParseException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * This demonstrates a client consuming RecordV2 topics. * <P> * It has been contrived to demonstrate the various techniques for Diffusion * record topics and is not necessarily realistic or efficient in its * processing. * <P> * It can be run using a schema or not using a schema and demonstrates how the * processing could be done in both cases. * <P> * This makes use of the 'Topics' feature only. * <P> * To subscribe to a topic, the client session must have the 'select_topic' and * 'read_topic' permissions for that branch of the topic tree. * <P> * This example receives updates to currency conversion rates via a branch of * the topic tree where the root topic is called "FX" which under it has a topic * for each base currency and under each of those is a topic for each target * currency which contains the bid and ask rates. So a topic FX/GBP/USD would * contain the rates for GBP to USD. * <P> * This example maintains a local map of the rates and also notifies a listener * of any rates changes. * * @author DiffusionData Limited */ public final class RecordV2Subscribing { private static final Logger LOG = LoggerFactory.getLogger(RecordV2Subscribing.class); private static final String ROOT_TOPIC = "FX"; /** * The map of currency codes to currency objects which each maintain rates * for each target currency. */ private final Map<String, Currency> currencies = new ConcurrentHashMap<>(); private Schema schema; private final RatesListener listener; private final Session session; /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" * @param listener a listener that will be notified of all rates and rate * changes */ public RecordV2Subscribing(String serverUrl, RatesListener listener) { this.listener = requireNonNull(listener); session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); // Use the Topics feature to add a record value stream and subscribe to // all topics under the root. final Topics topics = session.feature(Topics.class); final String topicSelector = String.format("?%s//", ROOT_TOPIC); topics.addStream( topicSelector, RecordV2.class, new RatesValueStream()); topics.subscribe(topicSelector) .whenComplete((voidResult, exception) -> { if (exception != null) { LOG.info("subscription failed", exception); } }); } /** * Returns the rates for a given base and target currency. * * @param currency the base currency * @param targetCurrency the target currency * @return the rates or null if there is no such base or target currency */ public Rates getRates(String currency, String targetCurrency) { final Currency currencyObject = currencies.get(currency); if (currencyObject != null) { return currencyObject.getRates(targetCurrency); } return null; } /** * This is used to apply topic stream updates to the local map and notify * listener of changes. */ private void applyUpdate( String currency, String targetCurrency, RecordV2 oldValue, RecordV2 newValue) { Currency currencyObject = currencies.get(currency); if (currencyObject == null) { currencyObject = new Currency(); currencies.put(currency, currencyObject); } if (schema == null) { updateWithoutSchema( currency, targetCurrency, oldValue, newValue, currencyObject); } else { updateWithSchema( currency, targetCurrency, oldValue, newValue, currencyObject); } } private void updateWithSchema( String currency, String targetCurrency, RecordV2 oldValue, RecordV2 newValue, Currency currencyObject) { // A data model is generated using the schema allowing direct access to // the fields within it final RecordModel model = newValue.asModel(schema); final String bid = model.get("Bid"); final String ask = model.get("Ask"); currencyObject.setRate(targetCurrency, bid, ask); if (oldValue == null) { listener.onNewRate(currency, targetCurrency, bid, ask); } else { // Generate a structural delta to determine what has changed final RecordV2Delta delta = newValue.diff(oldValue); for (Change change : delta.changes(schema)) { final String fieldName = change.fieldName(); listener.onRateChange( currency, targetCurrency, fieldName, model.get(fieldName)); } } } private void updateWithoutSchema( String currency, String targetCurrency, RecordV2 oldValue, RecordV2 newValue, Currency currencyObject) { // All of the fields in the value are obtained. final List<String> fields = newValue.asFields(); final String bid = fields.get(0); final String ask = fields.get(1); currencyObject.setRate(targetCurrency, bid, ask); if (oldValue == null) { listener.onNewRate(currency, targetCurrency, bid, ask); } else { // Fields in the old value are obtained to determine what has // changed final List<String> oldfields = oldValue.asFields(); final String oldBid = oldfields.get(0); final String oldAsk = oldfields.get(1); if (!bid.equals(oldBid)) { listener.onRateChange(currency, targetCurrency, "Bid", bid); } if (!ask.equals(oldAsk)) { listener.onRateChange(currency, targetCurrency, "Ask", ask); } } } private void removeCurrency(String currency) { final Currency oldCurrency = currencies.remove(currency); for (String targetCurrency : oldCurrency.rates.keySet()) { listener.onRateRemoved(currency, targetCurrency); } } private void removeRate( String currency, String targetCurrency) { final Currency currencyObject = currencies.get(currency); if (currencyObject != null) { if (currencyObject.rates.remove(targetCurrency) != null) { listener.onRateRemoved(currency, targetCurrency); } } } /** * Close session. */ public void close() { currencies.clear(); session.close(); } /** * Encapsulates a base currency and all of its known rates. */ private static class Currency { private final Map<String, Rates> rates = new HashMap<>(); private Rates getRates(String currency) { return rates.get(currency); } private void setRate(String currency, String bid, String ask) { rates.put(currency, new Rates(bid, ask)); } } /** * Encapsulates the rates for a particular base/target currency pair. */ public static final class Rates { private final String bidRate; private final String askRate; /** * Constructor. * * @param bid the bid rate or "" * @param ask the ask rate or "" */ private Rates(String bid, String ask) { bidRate = bid; askRate = ask; } /** * Returns the bid rate. * * @return bid rate or "" if not available */ public String getBidRate() { return bidRate; } /** * Returns the ask rate. * * @return ask rate or "" if not available */ public String getAskRate() { return askRate; } } /** * A listener for Rates updates. */ public interface RatesListener { /** * Notification of a new rate or rate update. * * @param currency the base currency * @param targetCurrency the target currency * @param bid rate * @param ask rate */ void onNewRate(String currency, String targetCurrency, String bid, String ask); /** * Notification of a change to the bid or ask value for a rate. * * @param currency the base currency * @param targetCurrency the target currency * @param bidOrAsk "Bid" or "Ask" * @param rate the new rate */ void onRateChange(String currency, String targetCurrency, String bidOrAsk, String rate); /** * Notification of a rate being removed. * * @param currency the base currency * @param targetCurrency the target currency */ void onRateRemoved(String currency, String targetCurrency); } private final class RatesValueStream extends ValueStream.Default<RecordV2> { @Override public void onSubscription(String topicPath, TopicSpecification specification) { final boolean isRatesTopic = Diffusion.topicSelectors().parse("?FX/.*/.*").selects(topicPath); // Only retrieve a schema when subscribing to a rates topic if (isRatesTopic) { final String schemaString = specification.getProperties().get(TopicSpecification.SCHEMA); // If a schema is provided on subscription, retrieve it and set it once // All schemas are identical for rates topics. if (schemaString != null && schema == null) { try { schema = Diffusion.dataTypes().recordV2().parseSchema(schemaString); } catch (SchemaParseException e) { LOG.error("Unable to parse recordV2 schema", e); } } } } @Override public void onValue(String topicPath, TopicSpecification specification, RecordV2 oldValue, RecordV2 newValue) { final String[] topicElements = elements(topicPath); // It is only a rate update if topic has 2 elements below root path if (topicElements.length == 2) { applyUpdate( topicElements[0], // The base currency topicElements[1], // The target currency oldValue, newValue); } } @Override public void onUnsubscription(String topicPath, TopicSpecification specification, UnsubscribeReason reason) { final String[] topicElements = elements(topicPath); if (topicElements.length == 2) { removeRate(topicElements[0], topicElements[1]); } else if (topicElements.length == 1) { removeCurrency(topicElements[0]); } } private String[] elements(String topicPath) { final String subPath = topicPath.replaceFirst("^" + ROOT_TOPIC + "/", ""); return subPath.split("/"); } } }