Ingesting from an external source#
If you prefer to initially manipulate your data in a dataframe before converting into a graph, Raphtory can directly ingest dataframes and convert these into node and edge updates.
Creating a graph from dataframes#
In the example below we are ingesting some network traffic data which includes different types of interactions between servers. First we read the data from disk into two dataframes, one for the server information (nodes) and one for the server interactions (edges). Then we convert the timestamp column to datetime objects. Finally, the two dataframes are printed out so you can see the headers and values.
from raphtory import Graph
import pandas as pd
edges_df = pd.read_csv("../data/network_traffic_edges.csv")
edges_df["timestamp"] = pd.to_datetime(edges_df["timestamp"])
nodes_df = pd.read_csv("../data/network_traffic_nodes.csv")
nodes_df["timestamp"] = pd.to_datetime(nodes_df["timestamp"])
pd.set_option('display.max_columns', None) # so all columns are printed
print("--- Edge Dataframe ---")
print(f"{edges_df.head(2)}\n")
print()
print("--- Node Dataframe ---")
print(f"{nodes_df.head(2)}\n")
Output
--- Edge Dataframe ---
timestamp source destination data_size_MB \
0 2023-09-01 08:00:00+00:00 ServerA ServerB 5.6
1 2023-09-01 08:05:00+00:00 ServerA ServerC 7.1
transaction_type is_encrypted
0 Critical System Request True
1 File Transfer False
--- Node Dataframe ---
timestamp server_id server_name hardware_type OS_version \
0 2023-09-01 08:00:00+00:00 ServerA Alpha Blade Server Ubuntu 20.04
1 2023-09-01 08:05:00+00:00 ServerB Beta Rack Server Red Hat 8.1
primary_function uptime_days
0 Database 120
1 Web Server 45
Next, to ingest these dataframes into Raphtory, we use the load_edges() and load_nodes()
functions. These functions have optional arguments to cover everything we have seen in the
prior direct updates example.
For the parameters of the edges we specify:
edges_df- dataframe that is ingestedsource- source column within the dataframedestination- destination column within the dataframetimestamp- timestamp column within the dataframedata_size_MB- temporal propertyis_encrypted- metadatadatasource- an additional shared metadata entry which labels the origin of the informationtransaction_type- layer column
For the parameters for the nodes, we specify:
nodes_df- dataframe that is ingestedserver_id- node IDtimestamp- time column- Temporal properties
OS_versionprimary_functionuptime_days
- Metadata
server_namehardware_type
datasource- an additional shared metadata entry which labels the origin of the information
The resulting graph and an example node/edge are then printed to show the data fully converted.
from raphtory import Graph
import pandas as pd
g = Graph()
edges_df = pd.read_csv("../data/network_traffic_edges.csv")
edges_df["timestamp"] = pd.to_datetime(edges_df["timestamp"])
nodes_df = pd.read_csv("../data/network_traffic_nodes.csv")
nodes_df["timestamp"] = pd.to_datetime(nodes_df["timestamp"])
g.load_edges(
data=edges_df,
time="timestamp",
src="source",
dst="destination",
properties=["data_size_MB"],
layer_col="transaction_type",
metadata=["is_encrypted"],
shared_metadata={"datasource": "../data/network_traffic_edges.csv"},
)
g.load_nodes(
data=nodes_df,
time="timestamp",
id="server_id",
properties=["OS_version", "primary_function", "uptime_days"],
metadata=["server_name", "hardware_type"],
shared_metadata={"datasource": "../data/network_traffic_edges.csv"},
)
print("The resulting graphs and example node/edge:")
print(g)
print(g.node("ServerA"))
print(g.edge("ServerA", "ServerB"))
Output
The resulting graphs and example node/edge:
Graph(number_of_nodes=5, number_of_edges=7, number_of_temporal_edges=7, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693557000000, event_id=18446744073709551615))
Node(name=ServerA, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693556400000, event_id=4), properties=Properties({OS_version: Ubuntu 20.04, primary_function: Database, uptime_days: 120}))
Edge(source=ServerA, target=ServerB, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693555200000, event_id=0), properties={data_size_MB: 5.6}, layer(s)=[Critical System Request])
Creating a graph from a Parquet file#
Similarly for Parquet you can use load_edges() and load_nodes() with a file path to load data from files in the common Apache Parquet format.
from raphtory import Graph
h=Graph()
h.load_edges(
data="../data/net_edges.parquet",
time="timestamp",
src="source",
dst="destination",
properties=["data_size_MB"],
layer_col="transaction_type",
metadata=["is_encrypted"],
)
h.load_nodes(
data="../data/net_nodes.parquet",
time="timestamp",
id="server_id",
properties=["OS_version", "primary_function", "uptime_days"],
metadata=["server_name", "hardware_type"],
)
print(h)
Output
Graph(number_of_nodes=5, number_of_edges=7, number_of_temporal_edges=7, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693557000000, event_id=18446744073709551615))
Adding metadata via dataframes#
In this example, we break the ingestion into a four stage process, adding the metadata at the end. We use the same two dataframes for brevity but in real instances these would probably be four different dataframes, one for each function call.
There may be instances where you are adding a dataset which has no timestamps. To handle this when ingesting via
dataframes, the graph has the load_edge_metadata() and load_node_metadata() functions which are shown in this example.
Warning
Metadata can only be added to nodes and edges which are part of the graph. If you attempt to add a metadata without first adding the node/edge then Raphtory will throw an error.
from raphtory import Graph
import pandas as pd
g = Graph()
edges_df = pd.read_csv("../data/network_traffic_edges.csv")
edges_df["timestamp"] = pd.to_datetime(edges_df["timestamp"])
nodes_df = pd.read_csv("../data/network_traffic_nodes.csv")
nodes_df["timestamp"] = pd.to_datetime(nodes_df["timestamp"])
g.load_edges(
data=edges_df,
src="source",
dst="destination",
time="timestamp",
properties=["data_size_MB"],
layer_col="transaction_type",
)
g.load_nodes(
data=nodes_df,
id="server_id",
time="timestamp",
properties=["OS_version", "primary_function", "uptime_days"],
)
g.load_edge_metadata(
data=edges_df,
src="source",
dst="destination",
layer_col="transaction_type",
metadata=["is_encrypted"],
shared_metadata={"datasource": "docs/data/network_traffic_edges.csv"},
)
g.load_node_metadata(
data=nodes_df,
id="server_id",
metadata=["server_name", "hardware_type"],
shared_metadata={"datasource": "docs/data/network_traffic_edges.csv"},
)
print(g)
print(g.node("ServerA"))
print(g.edge("ServerA", "ServerB"))
Output
Graph(number_of_nodes=5, number_of_edges=7, number_of_temporal_edges=7, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693557000000, event_id=18446744073709551615))
Node(name=ServerA, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693556400000, event_id=4), properties=Properties({OS_version: Ubuntu 20.04, primary_function: Database, uptime_days: 120}))
Edge(source=ServerA, target=ServerB, earliest_time=EventTime(timestamp=1693555200000, event_id=0), latest_time=EventTime(timestamp=1693555200000, event_id=0), properties={data_size_MB: 5.6}, layer(s)=[Critical System Request])