Graph Data Federation

Priya Jacob
20 min readMar 31, 2023

--

Photo by Duy Pham on Unsplash

#longread

Note: I mostly tend to take a newly released feature, experiment with it, form my own conclusions and write about what I learnt in the process. This one is my naïve albeit honest attempt at understanding graph data sharding, and federation in particular, employing the Neo4j graph database. I also have mullings scribbled in italic that all went into this making. Not important, yet part of the learning, feel free to skip.

This blog aims to;
- Introduce Graph Data Federation
- Discuss specific use cases where graph data federation is useful, such as integrating data from multiple sources, handling dynamic data, and performing complex queries
- Demonstrate how data federation is implemented in graph databases, including the architecture, protocols and middleware used, and the trade-offs involved
- Some real-world examples that demonstrate how graph data federation may be used to solve a specific problem

Introduction to Graph Data Federation
Graph data federation refers to the process of connecting multiple graph databases together to form a larger, federated graph. Each graph database in the federation is responsible for a subset of the nodes and relationships in the federated graph. The idea behind federation is to use a middleware layer that handles communication between the constituent graph databases via distributed queries to provide a unified view of the data.

The prerequisite to running federated queries is the presence of constituent graphs, that are either the result of creation of disjointed or siloed graphs by design, or graph database sharding or partitioning.

A note on Graph Database Sharding
Graph database sharding or partitioning refers to the process of dividing a graph database into smaller subgraphs, called shards or partitions. Each partition contains a subset of the nodes and relationships in the graph and may be stored on the same or a remote graph DBMS.

The motivations behind Graph database sharding could be several;
- Scalability: Sharding/Partitioning allows for the distribution of data across multiple graph databases, which may improve higher concurrent access.
- Performance: Sharding/Partitioning may improve query performance by allowing the database to only scan relevant shards/partitions of data for a given query, as opposed to scanning one large graph. Generally speaking, graph queries benefit from graph locality and do not suffer from the one-big-table problem found in relational databases.
- Management: Sharding/Partitioning can make it easier to manage a large amount of data by allowing for the archiving or purging of historical data in a specific shard/partition, rather than having to perform those operations on one large graph. It can further ease backup/restore and disaster recovery operations.
We are not talking in terms of below benefits that arise from design of some NoSQL and Massively Parallel Processing (MPP) databases, that place partitioning and replication in the same light;
- High availability: Partitioning data across multiple servers can also improve the availability of the database by allowing for the continued operation of the database in the event of a failure of one or more servers.
- Cloud-based usage: Partitioning allows for horizontal scaling and parallel processing that is required for cloud-based usage.

A graph database stores data in the form of nodes and relationships. In a graph database, each node has connections to other nodes, and these connections represent actual physical relationships between them.

Partitioning a graph database would therefore require the partitioning of these relationships, as well as the nodes themselves. This can be complex because it may require ensuring that all the relationships between nodes are maintained across the many partitions leading to redundancy. Additionally, it can be difficult to determine how to split the graph into partitions, that are balanced in terms of the number of nodes and relationships, and that can be queried efficiently.

Another reason is that graph databases are typically optimized for fast traversals of the graph, which would be hindered by partitioning because it would require additional hops across partitions.

Use Cases (~Scatter & Gather!~)
It could just be that we started off designing graphs in silos (read departmental, business function-wise, domain etc.) that then had to be integrated at some point. Or it could just have been a conscious decision to separate graphs such that they are disjointed with each serving a pointed purpose, with a possibility to query across in future.

In some other cases, it could be a design consideration to archive portions of the graph that are queried less often over time to help make the current graph manageable and easy on operations such as backup/restore and disaster recovery. Querying over archived portions of the graph are then achieved using federated queries that span data shards.

Technical Implementation
All references here are made w.r.t. the Neo4j graph database.

Before we get to graph data federation, we implement graph data sharding to create subgraphs. We then use these subgraph databases as constituents, to define a composite database in Neo4j 5.x, to then demonstrate federated querying using Cypher.

The assumption is that we have one large graph database (that we are about to build), that needs sharding, for the purpose of demonstration. We are employing Neo4j 5.3 for our experiments. Details of setting up Neo4j 5.3 are not part of the scope and can be found in the product operations manual. For reference, I’m using a Linux tarball install on an AWS EC2 instance.

The Yelp dataset:

You download the dataset (yelp_dataset.tar) from here, unzip it and move the relevant files to the import folder of Neo4j install; We’re only dealing with Businesses, Users and Reviews.

Let’s profile the dataset;

CALL apoc.load.json("file:///yelp_academic_dataset_business.json") YIELD value AS row
RETURN count(row)
//150346

CALL apoc.load.json("file:///yelp_academic_dataset_user.json") YIELD value AS row
RETURN count(row)
//1987897

CALL apoc.load.json("file:///yelp_academic_dataset_review.json") YIELD value AS row
RETURN count(row)
//6990280

Let’s load the dataset;

CREATE DATABASE yelp;

CREATE CONSTRAINT cons_uq_business_id FOR (n:Business) REQUIRE n.business_id IS UNIQUE;

//create business nodes
CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_business.json") YIELD value AS row
RETURN row',
'MERGE (n:Business {business_id: row.business_id})
SET
n.name = toLower(trim(row.name)),
n.categories = [x IN split(row.categories,",")|toLower(trim(x))],
n.is_open = CASE WHEN row.is_open = 1 THEN true ELSE false END,
n.stars = toFloat(row.stars),
n.review_count = toInteger(row.review_count),
n.address = toLower(trim(row.address)),
n.city = toLower(trim(row.city)),
n.state = toLower(trim(row.state)),
n.postal_code = trim(row.postal_code),
n.geolocation = point({latitude:toFloat(row.latitude), longitude:toFloat(row.longitude)})',
{batchSize: 2000, parallel: true})

//create business attributes map as a string for later processing
CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_business.json") YIELD value AS row
RETURN row.business_id AS r_business_id, row.attributes AS r_attributes'
,
'MATCH (n:Business {business_id: r_business_id})
CALL apoc.convert.setJsonProperty(n, "attributes", r_attributes)',
{batchSize: 2000, parallel: true})

CREATE CONSTRAINT cons_uq_user_id FOR (n:User) REQUIRE n.user_id IS UNIQUE;

//create user nodes
CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_user.json") YIELD value AS row
RETURN row',
'MERGE (n:User {user_id: row.user_id})
SET
n.name = toLower(trim(row.name)),
n.review_count = toInteger(row.review_count),
n.yelping_since = date(substring(row.yelping_since,0,10)),
n.useful = toInteger(row.useful),
n.funny = toInteger(row.funny),
n.cool = toInteger(row.cool),
n.fans = toInteger(row.fans),
n.elite = CASE WHEN size(trim(row.elite))>0 THEN [x IN split(row.elite,",")|trim(x)] ELSE null END,
n.average_stars = toFloat(row.average_stars),
n.compliment_hot = toInteger(row.compliment_hot),
n.compliment_more = toInteger(row.compliment_more),
n.compliment_profile = toInteger(row.compliment_profile),
n.compliment_cute = toInteger(row.compliment_cute),
n.compliment_list = toInteger(row.compliment_list),
n.compliment_note = toInteger(row.compliment_note),
n.compliment_plain = toInteger(row.compliment_plain),
n.compliment_cool = toInteger(row.compliment_cool),
n.compliment_funny = toInteger(row.compliment_funny),
n.compliment_writer = toInteger(row.compliment_writer)',
{batchSize: 2000, parallel: true})

//create user friends
CALL apoc.load.json("file:///yelp_academic_dataset_user.json") YIELD value AS row
WITH row WHERE row.friends = "None" //row.friends IS NULL //trim(row.friends) = "" //size(trim(row.friends)) = 0
RETURN COUNT(row)
//878551

CALL apoc.load.json("file:///yelp_academic_dataset_user.json") YIELD value AS row
WITH row WHERE row.friends <> "None" //row.friends IS NULL //trim(row.friends) = "" //size(trim(row.friends)) = 0
RETURN COUNT(row)
//1109346

CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_user.json") YIELD value AS row
WITH row WHERE row.friends <> "None"
RETURN row.user_id AS r_user_id, [x IN split(row.friends,",")|trim(x)] AS r_friends',
'MATCH (n:User {user_id: r_user_id})
FOREACH (x IN r_friends | MERGE (f:User {user_id: x}) MERGE (n)-[:FRIENDS_WITH]-(f))',
{batchSize: 2000, parallel: false})

CREATE CONSTRAINT cons_uq_review_id FOR (n:Review) REQUIRE n.review_id IS UNIQUE;

//create review nodes
CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_review.json") YIELD value AS row
RETURN row',
'MERGE (n:Review {review_id: row.review_id})
SET
n.stars = toFloat(row.stars),
n.review_date = date(substring(row.date,0,10)),
n.review_text = toLower(trim(row.text)),
n.useful = toInteger(row.useful),
n.funny = toInteger(row.funny),
n.cool = toInteger(row.cool)',
{batchSize: 2000, parallel: true})

CALL apoc.load.json("file:///yelp_academic_dataset_review.json") YIELD value AS row
RETURN count(distinct row.user_id)
//1987929

CALL apoc.load.json("file:///yelp_academic_dataset_review.json") YIELD value AS row
RETURN count(distinct row.business_id)
//150346

//link review nodes
CALL apoc.periodic.iterate('
CALL apoc.load.json("file:///yelp_academic_dataset_review.json") YIELD value AS row
RETURN row',
'
MERGE (u:User {user_id: row.user_id})
MERGE (b:Business {business_id: row.business_id})
MERGE (r:Review {review_id: row.review_id})
MERGE (u)-[:WROTE]->(r)
MERGE (r)-[:ABOUT]->(b)',
{batchSize: 2000, parallel: false})

CREATE CONSTRAINT cons_uq_address_id FOR (n:Address) REQUIRE n.address_id IS UNIQUE;
CREATE CONSTRAINT cons_uq_city FOR (n:City) REQUIRE n.city IS UNIQUE;
CREATE CONSTRAINT cons_uq_state FOR (n:State) REQUIRE n.state IS UNIQUE;

//link businesses with their address
CALL apoc.periodic.iterate('
MATCH (b:Business)
RETURN b',
'MERGE (a:Address {address_id: b.business_id})
SET
a.address = b.address,
a.postal_code = b.postal_code,
a.geolocation = b.geolocation

MERGE (c:City {city: b.city})
MERGE (s:State {state: b.state})

MERGE (b)-[:LOCATED_AT]->(a)
MERGE (a)-[:IN_CITY]->(c)
MERGE (a)-[:IN_STATE]->(s)

REMOVE
b.address,
b.postal_code,
b.geolocation,
b.city,
b.state',
{batchSize: 2000, parallel: false})

CREATE CONSTRAINT cons_uq_category FOR (n:Category) REQUIRE n.category IS UNIQUE;

//elevate business categories as nodes and link them with businesses
CALL apoc.periodic.iterate('
MATCH (b:Business)
RETURN b',
'WITH b
FOREACH (x IN b.categories | MERGE (c:Category {category: x}) MERGE (b)-[:OF_TYPE]->(c))',
{batchSize: 2000, parallel: false})

With the data loaded in yelp database, let’s put down a sharding strategy;

Must reviews be time bound in cases where recommendations are to be made?
The time aspect of reviews depends on the specific recommendation scenario and the goals of the recommendation framework. In some cases, it may be important to consider only the most recent reviews, while in other cases, older reviews may also be relevant.

For example, in a scenario where recommendations are being made for a fast-changing business environment, such as the food industry, it may be necessary to give more weight to recent reviews to reflect any changes in the business or user preferences.

On the other hand, for a more stable business environment, such as the movie industry, older reviews may still be relevant and it may not be necessary to restrict recommendations to only the most recent ones.

Ultimately, the time aspect of reviews should be considered based on the specific requirements of the recommendation framework and the goals it is trying to achieve.

Given such a graph and the recommendation framework that is to be designed for it, does it make sense to shard the graph on reviews by say year?
Sharding the graph on reviews by year can be a useful strategy in some cases. This approach can help reduce the size of the graph and improve the efficiency of the recommendation framework by only considering a subset of the data that is relevant for a given time period.

For example, if the recommendation framework is designed to make recommendations for a specific year, such as for a holiday season, sharding the graph on reviews by year can help focus the analysis on only the most recent and relevant data.

However, sharding the graph on reviews by year can also limit the ability of the recommendation framework to consider the historical context and trends of the data. In some cases, this may result in less accurate recommendations.

Therefore, the decision to shard the graph on reviews by year should be based on the specific requirements and goals of the recommendation framework and a trade-off analysis between efficiency and accuracy.

Here are some of the factors to consider when sharding our graph database with Business, User, and Review entities:
Node Cardinality: Node cardinality refers to the number of nodes in the graph for a given label or type. If the cardinality of a particular label is much higher than others, it may make sense to shard the data for that label. In this example, if the number of Users and Business entities is much larger than the number of Review entities, then it may make sense to shard the User and Business data.

Relationship Cardinality: Relationship cardinality refers to the number of relationships between nodes. If there are a high number of relationships between two particular labels, it may make sense to keep the data for those labels on the same shard to avoid cross-shard communication overhead. In this example, if there are a high number of relationships between User and Review entities, it may make sense to shard the data such that both User and Review data are kept on the same shard.

Query patterns: Query patterns refer to the nature of queries that will be executed against the graph database. If a particular query pattern is frequently used and requires data from multiple shards, it may make sense to shard the data in a way that minimizes cross-shard communication for that query. In this example, if there is a frequent query pattern that requires data from both User, Business, and Review entities, it may make sense to shard the data in a way that minimizes cross-shard communication for that query.

Data growth rate: Data growth rate refers to the rate at which the data in the graph database is expected to grow. If the data is expected to grow rapidly, it may make sense to shard the data in a way that allows for easy addition of new shards as needed.

The specific sharding approach will depend on the requirements of the system and should be evaluated and tested thoroughly before being implemented in a production environment.

We choose to keep the Businesses in a separate graph, the Users in a separate social graph and shard the Reviews by blocks of five years (for purpose of demonstration) to transact and account for more recent reviews. That said, the time factor really must depend on the nature and relevance of business transactions. The idea here is also to always have a shard where current reviews can be written to and read from. Once the current period has elapsed, it must drive for creation of an archive shard for the period in question.

#create shard for business master data
bin/neo4j-admin database copy yelp business --copy-only-nodes-with-labels=Business,Category,Address,City,State

#create shard for user social graph
bin/neo4j-admin database copy yelp social --copy-only-nodes-with-labels=User

Sharding strategy for Reviews; we run a data profile and start off by labeling our nodes for subsequent partitioning.

UNWIND [x IN range(2001, 2025, 5) | range(x, x+4)] AS l
CALL {
WITH l
MATCH (n:Review)
WHERE n.review_date.year IN l
RETURN l AS yrRange, count(n) AS reviewCnt
}
RETURN yrRange, reviewCnt
//create index
CREATE RANGE INDEX idx_review_date FOR (n:Review) ON n.review_date;
SHOW INDEXES;

//label reviews in range [2001,2002,2003,2004,2005]
CALL apoc.periodic.iterate('
MATCH (u:User)-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
WHERE r.review_date >= date("2001-01-01") AND r.review_date <= date("2005-12-31")
RETURN u, r, b',
'SET u:Shard_2001_2005, r:Shard_2001_2005, b:Shard_2001_2005',
{batchSize: 200, parallel:false})

//label reviews in range [2006,2007,2008,2009,2010]
CALL apoc.periodic.iterate('
MATCH (u:User)-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
WHERE r.review_date >= date("2006-01-01") AND r.review_date <= date("2010-12-31")
RETURN u, r, b',
'SET u:Shard_2006_2010, r:Shard_2006_2010, b:Shard_2006_2010',
{batchSize: 2000, parallel:false})

//label reviews in range [2011,2012,2013,2014,2015]
CALL apoc.periodic.iterate('
MATCH (u:User)-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
WHERE r.review_date >= date("2011-01-01") AND r.review_date <= date("2015-12-31")
RETURN u, r, b',
'SET u:Shard_2011_2015, r:Shard_2011_2015, b:Shard_2011_2015',
{batchSize: 2000, parallel:false})

//label reviews in range [2016,2017,2018,2019,2020]
CALL apoc.periodic.iterate('
MATCH (u:User)-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
WHERE r.review_date >= date("2016-01-01") AND r.review_date <= date("2020-12-31")
RETURN u, r, b',
'SET u:Shard_2016_2020, r:Shard_2016_2020, b:Shard_2016_2020',
{batchSize: 2000, parallel:false})

//label reviews in range [2021,2022,2023,2024,2025]
CALL apoc.periodic.iterate('
MATCH (u:User)-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
WHERE r.review_date >= date("2021-01-01") AND r.review_date <= date("2025-12-31")
RETURN u, r, b',
'SET u:Shard_2021_2025, r:Shard_2021_2025, b:Shard_2021_2025',
{batchSize: 2000, parallel:false})

Create database shards;

#create shard for range [2001,2002,2003,2004,2005]
bin/neo4j-admin database copy yelp reviews20012005 --copy-only-nodes-with-labels=Shard_2001_2005 --skip-labels=Shard_2001_2005,Shard_2006_2010,Shard_2011_2015,Shard_2016_2020,Shard_2021_2025 --copy-only-node-properties=Business.business_id,User.user_id --ignore-relationships-with-types=FRIENDS_WITH

#create shard for range [2006,2007,2008,2009,2010]
bin/neo4j-admin database copy yelp reviews20062010 --copy-only-nodes-with-labels=Shard_2006_2010 --skip-labels=Shard_2001_2005,Shard_2006_2010,Shard_2011_2015,Shard_2016_2020,Shard_2021_2025 --copy-only-node-properties=Business.business_id,User.user_id --ignore-relationships-with-types=FRIENDS_WITH

#create shard for range [2011,2012,2013,2014,2015]
bin/neo4j-admin database copy yelp reviews20112015 --copy-only-nodes-with-labels=Shard_2011_2015 --skip-labels=Shard_2001_2005,Shard_2006_2010,Shard_2011_2015,Shard_2016_2020,Shard_2021_2025 --copy-only-node-properties=Business.business_id,User.user_id --ignore-relationships-with-types=FRIENDS_WITH

#create shard for range [2016,2017,2018,2019,2020]
bin/neo4j-admin database copy yelp reviews20162020 --copy-only-nodes-with-labels=Shard_2016_2020 --skip-labels=Shard_2001_2005,Shard_2006_2010,Shard_2011_2015,Shard_2016_2020,Shard_2021_2025 --copy-only-node-properties=Business.business_id,User.user_id --ignore-relationships-with-types=FRIENDS_WITH

#create shard for range [2021,2022,2023,2024,2025]
bin/neo4j-admin database copy yelp reviews20212025 --copy-only-nodes-with-labels=Shard_2021_2025 --skip-labels=Shard_2001_2005,Shard_2006_2010,Shard_2011_2015,Shard_2016_2020,Shard_2021_2025 --copy-only-node-properties=Business.business_id,User.user_id --ignore-relationships-with-types=FRIENDS_WITH

Once the database shards have been created, we proceed with creation of the corresponding databases, the composite database and its constituent definition.

//create databases for shards
CREATE DATABASE business IF NOT EXISTS;
CREATE DATABASE social IF NOT EXISTS;
CREATE DATABASE reviews20012005 IF NOT EXISTS;
CREATE DATABASE reviews20062010 IF NOT EXISTS;
CREATE DATABASE reviews20112015 IF NOT EXISTS;
CREATE DATABASE reviews20162020 IF NOT EXISTS;
CREATE DATABASE reviews20212025 IF NOT EXISTS;

//create composite database
CREATE COMPOSITE DATABASE reviews IF NOT EXISTS;

//create composite database aliases
CREATE ALIAS reviews.business IF NOT EXISTS FOR DATABASE business;
CREATE ALIAS reviews.social IF NOT EXISTS FOR DATABASE social;
CREATE ALIAS reviews.year2001to2005 IF NOT EXISTS FOR DATABASE reviews20012005;
CREATE ALIAS reviews.year2006to2010 IF NOT EXISTS FOR DATABASE reviews20062010;
CREATE ALIAS reviews.year2011to2015 IF NOT EXISTS FOR DATABASE reviews20112015;
CREATE ALIAS reviews.year2016to2020 IF NOT EXISTS FOR DATABASE reviews20162020;
CREATE ALIAS reviews.current IF NOT EXISTS FOR DATABASE reviews20212025;

And now for some hands-on querying experience on the composite database using Cypher; We’re going to compare the ease of writing queries on the single yelp database vs. the composite database reviews. You’ll find side notes on ideas about what to query of which I’ve only picked a few to illustrate.

We’ll need to recreate the indexes on the sharded databases to aid query performance.

Given a graph of Users, Businesses and Reviews where Users write reviews about a Business, what must a recommendation framework help with?
A recommendation framework in this context must help with:

-Predicting which businesses a user is likely to review based on past behavior.
-Suggesting businesses to users based on their preferences and interests.
-Ranking businesses for a user based on relevance, quality, and other factors.
-Improving the accuracy of recommendations over time by learning from user behavior.

What could be examples of content filtering and collaborative filtering in the above use case?
In the above use case, the following are examples of content filtering and collaborative filtering:

Content filtering:
Recommend businesses based on the categories of businesses that a user has reviewed in the past (e.g. recommending Italian restaurants to someone who has reviewed multiple Italian restaurants).
Recommend businesses based on the keywords used in a user’s reviews (e.g. recommending coffee shops to someone who frequently uses the word “coffee” in their reviews).

Collaborative filtering:
Recommend businesses based on the reviews and ratings given by other users who have similar tastes and preferences (e.g. recommending a restaurant that is popular among users with similar tastes).
Predict the rating a user is likely to give to a business based on their past behavior and the behavior of similar users (e.g. recommending a restaurant with a high rating to someone who has given high ratings to similar restaurants in the past).

//content filtering example

//on yelp database
MATCH (u:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})-[:WROTE]->(r:Review WHERE r.review_date >= date('2021-01-01') AND r.review_date <= date('2025-12-31'))-[:ABOUT]->(:Business)-[:OF_TYPE]->(t:Category)
WITH u, t, count(r) AS reviews
ORDER BY reviews DESC
WITH u, collect(t.category)[0..5] AS reviewedCategories
MATCH (b:Business WHERE b.is_open AND b.stars >= 3.0)-[:LOCATED_AT]->()-[:IN_CITY]->(c:City WHERE c.city = 'indianapolis')
WHERE NOT (u)-[:WROTE]->()-[:ABOUT]->(b)
WITH b, size(apoc.coll.intersection(b.categories, reviewedCategories)) AS theCommonGround
WHERE theCommonGround >= 3
RETURN b.name AS recommendedBusiness, theCommonGround, b.stars AS rating, b.categories AS businessCategories
ORDER BY theCommonGround DESC, rating DESC
LIMIT 5

//on the composite database
CALL {
USE reviews.current
MATCH (u:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
RETURN b.business_id AS b, r.review_id AS r
}
CALL {
WITH b, r
USE reviews.business
MATCH (e:Business WHERE e.business_id = b)-[:OF_TYPE]->(t:Category)
RETURN b AS business, r AS review, t.category AS category
}
WITH category, count(review) AS cnt
ORDER BY cnt DESC
WITH collect(category)[0..5] AS reviewedCategories
CALL {
WITH reviewedCategories
USE reviews.business
MATCH (b:Business WHERE b.is_open AND b.stars >= 3.0)-[:LOCATED_AT]->()-[:IN_CITY]->(c:City WHERE c.city = 'indianapolis')
WITH b, size(apoc.coll.intersection(b.categories, reviewedCategories)) AS theCommonGround
WHERE theCommonGround >= 3
RETURN b.business_id AS bid, b.name AS bname, b.categories AS bcategories, b.stars AS brating, theCommonGround AS common
}
CALL {
WITH bid, bname, bcategories, brating, common
USE reviews.current
MATCH (n:Business WHERE n.business_id = bid)
WHERE NOT (:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})-[:WROTE]->()-[:ABOUT]->(n)
RETURN bname AS n, bcategories AS c, brating AS r, common AS f
}
RETURN n AS recommendedBusiness, f AS theCommonGround, r AS rating, c AS businessCategories
ORDER BY theCommonGround DESC, rating DESC
LIMIT 5

//collaborative filtering example

//on yelp database
MATCH (u:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})-[:WROTE]->(r:Review WHERE r.review_date >= date('2021-01-01') AND r.review_date <= date('2025-12-31'))-[:ABOUT]->(:Business)-[:OF_TYPE]->(t:Category)
WITH u, t, count(r) AS reviews
ORDER BY reviews DESC
WITH u, collect(t.category)[0..5] AS reviewedCategories

MATCH (othB:Business)-[:LOCATED_AT]->()-[:IN_CITY]->(c:City WHERE c.city = 'indianapolis')
WHERE
size(apoc.coll.intersection(othB.categories, reviewedCategories)) >= 3
AND othB.is_open AND othB.stars >= 3.0

MATCH (othB)<-[:ABOUT]-(othR:Review WHERE othR.review_date >= date('2021-01-01') AND othR.review_date <= date('2025-12-31') AND othR.stars >= 3.0)<-[:WROTE]-(othU:User)
WHERE
othU <> u
AND NOT (u)-[:WROTE]->()-[:ABOUT]->(othB)
AND (u)-[:FRIENDS_WITH]-(othU)

WITH othB, count(*) AS strength, round(avg(othR.stars),2) AS avgReviewRating, count(DISTINCT othU) AS thoseWhoCount, collect(DISTINCT othU.user_id) AS friendsWhoRecommended

RETURN othB.name AS recommendedBusiness, strength, avgReviewRating , othB.stars AS rating, thoseWhoCount, friendsWhoRecommended
ORDER BY strength DESC, rating DESC
LIMIT 5

//on the composite database
CALL {
USE reviews.current
MATCH (u:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})-[:WROTE]->(r:Review)-[:ABOUT]->(b:Business)
RETURN b.business_id AS b, r.review_id AS r
}
CALL {
WITH b, r
USE reviews.business
MATCH (e:Business WHERE e.business_id = b)-[:OF_TYPE]->(t:Category)
RETURN b AS business, r AS review, t.category AS category
}
WITH category, count(review) AS cnt
ORDER BY cnt DESC
WITH collect(category)[0..5] AS reviewedCategories
CALL {
WITH reviewedCategories
USE reviews.business
MATCH (othB:Business)-[:LOCATED_AT]->()-[:IN_CITY]->(c:City WHERE c.city = 'indianapolis')
WHERE othB.is_open AND othB.stars >= 3.0
AND size(apoc.coll.intersection(othB.categories,reviewedCategories)) >= 3
RETURN othB.business_id AS bId, othB.name AS bName, othB.stars AS bRating
}
CALL {
USE reviews.current
WITH bId, bName, bRating
MATCH (u:User {user_id: 'xalgcjscRLNPuyaAeKNThA'})
MATCH (othB:Business {business_id: bId})<-[:ABOUT]-(othR:Review WHERE othR.stars >= 3.0)<-[:WROTE]-(othU:User)
WHERE othU <> u
AND NOT (u)-[:WROTE]->()-[:ABOUT]->(othB)
RETURN othB.business_id AS bid, bName AS bname, bRating AS brating, othR.review_id AS rid, othU.user_id AS ouid, u.user_id AS muid
}
CALL {
WITH bid, bname, brating, rid, ouid, muid
USE reviews.social
MATCH (:User {user_id: muid})-[:FRIENDS_WITH]-(:User {user_id: ouid})
RETURN bid AS othB, bname AS othBName, brating AS othBRating, rid AS othR, ouid AS othU, muid AS u
}
WITH othB, othBName, othBRating, othR, othU, u
WITH othB, othBName, othBRating, count(othR) AS strength, round(avg(othBRating),2) AS avgReviewRating, count(DISTINCT othU) AS thoseWhoCount, collect(DISTINCT othU) AS friendsWhoRecommended
RETURN othBName AS recommendedBusiness, strength, avgReviewRating, othBRating AS rating, thoseWhoCount, friendsWhoRecommended
ORDER BY strength DESC, rating DESC
LIMIT 5

What inferences can be drawn from such a Graph of Users, Reviews and Businesses?
A graph of Users, Reviews, and Businesses can be used to make the following inferences:
User behavior: Understand the behavior and preferences of users, such as which businesses they review, how frequently they write reviews, and what their preferred categories are.
Business popularity: Determine the popularity of businesses based on the number of reviews, the average rating, and the distribution of ratings.
User-business relationships: Identify relationships between users and businesses, such as which users review which businesses and how frequently.
Review sentiment: Analyze the sentiment of reviews, such as whether they are positive, negative, or neutral, to gain insights into user opinions about businesses.
User-user relationships: Discover relationships between users, such as who writes reviews of the same businesses and who has similar preferences.
Business-business relationships: Identify relationships between businesses, such as which businesses are frequently reviewed by the same users and which businesses are competitors.
Market trends: Observe trends in the market, such as which businesses are gaining popularity and which are losing popularity, and what factors are driving these trends.

//determining business popularity

//on yelp database
MATCH (b:Business WHERE b.business_id = 'Ud4ySJB6xQd14nMooHUnZw')<-[:ABOUT]-(r:Review)
RETURN r.review_date.year AS year, count(r) AS reviews, round(avg(r.stars),2) AS avgRating ORDER BY year DESC

//on the composite database
UNWIND [x IN graph.names() WHERE NOT x IN ['reviews.business', 'reviews.social']] AS graph
CALL {
USE graph.byName(graph)
MATCH (b:Business WHERE b.business_id = 'Ud4ySJB6xQd14nMooHUnZw')<-[:ABOUT]-(r:Review)
RETURN r.review_date.year AS year, count(r) AS reviews, round(avg(r.stars),2) AS avgRating
}
RETURN year, reviews, avgRating
ORDER BY year DESC

//for a city, last 5 yrs, in given categories, top 5 businesses based on reviews and ratings
//fetch business name, reviews, avg rating and top 3 reviews

//on yelp database
MATCH (c:City WHERE c.city = 'indianapolis')
MATCH (b:Business)-[:LOCATED_AT]->()-[:IN_CITY]->(c)
WHERE size(apoc.coll.subtract(['restaurants','chinese'], b.categories)) = 0
MATCH (b)<-[:ABOUT]-(r:Review WHERE r.review_date >= date('2018-01-01') AND r.review_date <= date('2022-12-31'))
WITH r.review_date.year AS year, b, count(r) AS reviews, round(avg(r.stars),2) AS avgRating
ORDER BY reviews DESC, avgRating DESC
RETURN year, collect({business: b.name, reviews: reviews, rating: avgRating,
topReviews: apoc.coll.sortMaps([(b)<-[:ABOUT]-(n:Review) WHERE n.review_date >= date('2018-01-01') AND n.review_date <= date('2022-12-31')|{review: n.review_text, stars: toString(n.stars)}], "stars")[0..3]})[0..5] AS topBusinesses
ORDER BY year DESC

//on the composite database
CALL {
USE reviews.business
MATCH (b:Business)-[:LOCATED_AT]->()-[:IN_CITY]->(c:City WHERE c.city = 'indianapolis')
WHERE size(apoc.coll.subtract(['restaurants','chinese'], b.categories)) = 0
RETURN b.business_id AS bId, b.name AS bName
}
UNWIND ['reviews.year2016to2020', 'reviews.current'] AS graph
CALL {
USE graph.byName(graph)
WITH bId, bName
MATCH (b:Business {business_id: bId})<-[:ABOUT]-(r:Review WHERE r.review_date >= date('2018-01-01') AND r.review_date <= date('2022-12-31'))
RETURN r.review_date.year AS y, bId AS bid, bName AS bname, r.review_id AS rid, r.stars AS rating
}
WITH y, bid, bname, rid, rating
WITH y, bid, bname, count(rid) AS reviews, round(avg(rating),2) AS avgRating
ORDER BY reviews DESC, avgRating DESC
WITH y AS year, collect(bid)[0..5] AS bids, collect({bid: bid, business: bname, reviews: reviews, rating: avgRating})[0..5] AS topBusinesses
UNWIND ['reviews.year2016to2020', 'reviews.current'] AS graph
CALL {
USE graph.byName(graph)
WITH year, bids, topBusinesses
MATCH (b:Business WHERE b.business_id IN bids)<-[:ABOUT]-(r:Review WHERE r.review_date.year = year)
WITH year, b.business_id AS bid, r.review_text AS rtext, r.stars AS rstars
ORDER BY r.stars DESC
RETURN year AS y, bid AS business, collect({review: rtext, rating: rstars})[0..3] AS reviews
}
RETURN y AS year, topBusinesses, collect({bid: business, reviews: reviews}) AS topReviews
ORDER BY year DESC

Real-world examples
A few practical examples that may be used to demonstrate data federation in a graph database:

Social Media: A graph that includes data from multiple social media platforms such as Facebook, Twitter, and LinkedIn. The dataset can be used to demonstrate how data from different sources can be integrated and queried together to gain insights into social media activity.

Supply Chain: A graph that includes data from multiple sources such as ERP systems, logistics providers, and sensor data. The dataset can be used to demonstrate how data from different sources can be integrated and queried together to gain insights into the supply chain.

Healthcare: A graph that includes data from multiple sources such as electronic health records, clinical trials, and medical literature. The dataset can be used to demonstrate how data from different sources can be integrated and queried together to gain insights into healthcare.

Finance: A graph that includes data from multiple sources such as banking systems, stock markets, and financial news. The dataset can be used to demonstrate how data from different sources can be integrated and queried together to gain insights into finance.

Retail: A graph that includes data from multiple sources such as customer data, sales data, and inventory data. The dataset can be used to demonstrate how data from different sources can be integrated and queried together to gain insights into the retail industry.

Concluding thoughts

  • Sharding a graph database is tricky and requires meticulous planning and execution. Scavenging for scholarly publications on the topic yielded little to no results making it a widely unpopular theme in my sight. If you run graph database sharding in practice, I’d be really interested in knowing how!
  • All you need may just be “filtering” of nodes/relationships/properties into a smaller manageable graph by use of the nifty neo4j-admin copy command, without the need to federate — discern what your needs are
    https://neo4j.com/docs/operations-manual/5/backup-restore/copy-database/
  • Composite databases in Neo4j 5.x does give you the ability to query disjointed or siloed graphs with similar or varying schemas. It is also a significant improvement over the erstwhile Fabric feature first introduced in Neo4j 4 that had you configure the constituent databases via the neo4j configuration — the Cypher surface in Neo4j 5.x instead makes for easy leverage with greater flexibility and control. The constituent graphs could live on the same Neo4j DBMS or could be a combination of local and remote graph databases. Creating a remote database alias requires some extra work for purposes of security.
    https://neo4j.com/docs/operations-manual/5/composite-databases/introduction/
    https://neo4j.com/docs/operations-manual/5/manage-databases/remote-alias/
  • Federated graph querying can be less performant in some cases especially when there are multiple data shards involved. When operating over shards that have been partitioned by time, you may need to have some way of determining the shard(s) to query, say, via a custom user defined function, unless you’re always focused on the shard that represents the current period which is well defined. This may also be true for shards holding uniform data when partitioned by say, the node label. Cypher is also a lot more elegantly constructed on a single graph than on multiple shards (even more so with varying schemas) using CALL {} constructs that a composite database syntax demands. I personally find the syntax unappealing and cumbersome especially when it comes to writing multiple correlated subqueries.
    https://neo4j.com/docs/operations-manual/5/composite-databases/queries/
  • Further considerations
    https://neo4j.com/docs/operations-manual/5/composite-databases/considerations/

In addition to the above limitations, I also find that you cannot import node values between CALL {} subqueries and cannot possibly PROFILE a query on a composite database, at this time of writing. This may change in the future.

  • Composite databases can be made highly available and fault tolerant by combining them with autonomous clusters in Neo4j 5.x
    Here is a great talk from Nodes2022 that demonstrates the same.

--

--

No responses yet