""" Use-Case. Marketing analyst wants to understand path to purchase of a new product by a few early adopters ( say 5) through interactive queries. This product is high involvement and expensive, and therefore they want to understand the research undertaken by the customer. * Which device was used to initiate the first research. Was that prompted by an ad, email promotion? * How many devices were used overall and what was the time taken from initial research to final purchase * On which devices did the customer spend more time """ import itertools from collections import namedtuple, defaultdict from datetime import timedelta import networkx as nx import plotly.graph_objects as go from gremlin_python.process.traversal import P from gremlin_python.process.graph_traversal import ( outV, values, project, constant, select, inV, where, identity ) from nepytune import drawing from nepytune.visualizations import bar_plots Event = namedtuple('Event', 'ts persistentId transientId device_type url') Session = namedtuple('Session', 'transientId persistentId device_type events') def get_activity_of_early_adopters(g, thank_you_page_url, skip_single_transients=False, limit=5): """ Given thank you page url, find first early adopters of the product. In other words: * find first few persistent identities (or transient if they're not matched with any user) that visited given thank you page * extract their *whole* activity on the domain of the thank_you_page """ return ( g.V(thank_you_page_url) .hasLabel("website").as_("thank_you") .in_("links_to").as_("website_group") .select("thank_you") .inE("visited") .order().by("ts") .choose( constant(skip_single_transients).is_(P.eq(True)), where(outV().in_("has_identity")), identity() ) .choose( outV().in_("has_identity"), project( "type", "id", "purchase_ts" ) .by(constant("persistent")) .by(outV().in_("has_identity")) .by(values("ts")), project( "type", "id", "purchase_ts" ) .by(constant("transient")) .by(outV()) .by(values("ts")) ).dedup("id").limit(limit) .choose( select("type").is_("persistent"), project( "persistent_id", "transient_id", "purchase_ts" ).by(select("id").values("pid")) .by(select("id").out("has_identity").fold()) .by(select("purchase_ts")), project("persistent_id", "transient_id", "purchase_ts") .by(constant("")) .by(select("id").fold()) .by(select("purchase_ts")) ).project("persistent_id", "purchase_ts", "devices", "visits") .by(select("persistent_id")) .by(select("purchase_ts")) .by(select("transient_id").unfold().group().by(values("uid")).by(values("type"))) .by( select("transient_id").unfold().outE("visited").order().by("ts") .where( inV().in_("links_to").where(P.eq("website_group")) ) .project( "transientId", "url", "ts" ).by("uid").by("visited_url").by("ts").fold()) ) def transform_activities(result_set): """Build the flat list of user activities.""" for per_persistent_events in result_set: for visit in per_persistent_events["visits"]: if visit["ts"] <= per_persistent_events["purchase_ts"]: yield Event(**{ "persistentId": per_persistent_events["persistent_id"] or None, "device_type": per_persistent_events["devices"][visit["transientId"]], **visit }) def first_device_in_session(user_events): """Get device id which initialize session.""" return user_events[0].transientId def time_to_purchase(user_events): """Get device id which initialize session.""" return user_events[-1].ts - user_events[0].ts def consecutive_pairs(iterable): f_ptr, s_ptr = itertools.tee(iterable, 2) next(s_ptr) return zip(f_ptr, s_ptr) def generate_session_from_event(events, max_ts_delta=300): """Generate sessions from events.""" events_by_timestamp = sorted(events, key=lambda event: (event.transientId, event.ts)) guard_event = Event( ts=None, persistentId=None, transientId=None, device_type=None, url=None ) sessions = [] session = Session( transientId=events_by_timestamp[0].transientId, persistentId=events_by_timestamp[0].persistentId, device_type=events_by_timestamp[0].device_type, events=[] ) events_count = 0 for event, next_event in consecutive_pairs(events_by_timestamp + [guard_event]): session.events.append(event) if event.transientId != next_event.transientId or (next_event.ts - event.ts).seconds > max_ts_delta: sessions.append(session) events_count += len(session.events) session = Session( transientId=next_event.transientId, persistentId=next_event.persistentId, device_type=next_event.device_type, events=[] ) assert len(events_by_timestamp) == events_count return sessions def get_session_duration(user_session): """Get session duration.""" return user_session.events[-1].ts - user_session.events[0].ts def get_time_by_device(user_sessions): """Get time spent on device.""" time_by_device = defaultdict(timedelta) for session in user_sessions: time_by_device[session.transientId] += get_session_duration(session) return time_by_device def generate_stats(all_activities, **kwargs): """Generate statistics per user (persistentId) activities.""" result = dict() user_sessions = generate_session_from_event(all_activities, **kwargs) def grouper(session): return session.persistentId or session.transientId for persistent_id, session_list in (itertools.groupby(sorted(user_sessions, key=grouper), key=grouper)): session_list = list(session_list) session_durations = get_time_by_device(session_list) user_events_by_timestamp = sorted( itertools.chain.from_iterable([session.events for session in session_list]), key=lambda event: event.ts ) if persistent_id not in result: result[persistent_id] = { "transient_ids": {}, "devices_count": 0, "first_device": first_device_in_session(user_events_by_timestamp), "time_to_purchase": time_to_purchase(user_events_by_timestamp), } for transient_id, duration in session_durations.items(): user_sessions = sorted( [session for session in session_list if session.transientId == transient_id], key=lambda session: session.events[0].ts ) result[persistent_id]["transient_ids"][transient_id] = { "sessions_duration": duration, "sessions_count": len(user_sessions), "purchase_session": user_sessions[-1], "sessions": user_sessions } result[persistent_id]["devices_count"] += 1 return result def draw_referenced_subgraph(persistent_id, graph): drawing.draw( title=f"{persistent_id} path to purchase", scatters=list( drawing.edge_scatters_by_label( graph, opacity={"visited": 0.35, "purchase_path": 0.4}, widths={"links_to": 0.2, "visited": 3, "purchase_path": 3}, colors={"links_to": "grey", "purchase_path": "red"}, dashes={"links_to": "dot"} ) ) + list( drawing.scatters_by_label( graph, attrs_to_skip=["pos", "size"], sizes={ "event": 9, "persistentId": 20, "thank-you-page": 25, "website": 25, "session": 15, }, colors={ "event": 'rgb(153,112,171)', "session": 'rgb(116,173,209)', "thank-you-page": 'orange', "website": 'rgb(90,174,97)', "transientId": 'rgb(158,1,66)', "persistentId": 'rgb(213,62,79)' } ) ), ) def compute_subgraph_pos(query_results, thank_you_page): """Given query results compute subgraph positions.""" for persistent_id, raw_graph in _build_networkx_graph_single( query_results=query_results, thank_you_page=thank_you_page, max_ts_delta=300 ): raw_graph.nodes[thank_you_page]["label"] = "thank-you-page" graph_with_pos_computed = drawing.layout(raw_graph, _custom_layout) yield persistent_id, graph_with_pos_computed def custom_plots(data): """Build list of custom plot figures.""" return [ bar_plots.make_bars( { k[:5]: v["time_to_purchase"].total_seconds() / (3600 * 24) for k, v in data.items() }, title="User's time to purchase", x_title="Persistent IDs", y_title="Days to purchase", lazy=True ), _show_session_stats(data, title="Per device session statistics"), _show_most_common_visited_webpages(data, title="Most common visited subpages before purchase", count=10), ] # =========================== # Everything below was added to introspect the query results via visualisations def _show_session_stats(data, title): def sunburst_data(data): total_sum = sum( values["sessions_count"] for _, v in data.items() for values in v["transient_ids"].values() ) yield "", "Users", 1.5 * total_sum, "white", "" for i, (persistentId, v) in enumerate(data.items(), 1): yield ( "Users", persistentId[:5], sum(values["sessions_count"] for values in v["transient_ids"].values()), i, ( f"
persistentId: {persistentId}
" f"devices count: {len(v['transient_ids'])}" ) ) for transientId, values in v["transient_ids"].items(): yield ( persistentId[:5], transientId[:5], values["sessions_count"], i, ( f"
transientId: {transientId}" f"
session count: {values['sessions_count']}" f"
total session duration: {values['sessions_duration']}" ) ) for session in values["sessions"]: yield ( transientId[:5], session.events[0].ts, 1, i, ( f"
session start: {session.events[0].ts}" f"
session end: {session.events[-1].ts}" f"
session duration: {session.events[-1].ts - session.events[0].ts}" ) ) # aka legend yield "Users", "User ids", total_sum / 2, "white", "" yield "User ids", "User devices", total_sum / 2, "white", "" yield "User devices", "User sessions", total_sum / 2, "white", "" parents, labels, values, colors, hovers = zip(*[r for r in list(sunburst_data(data))]) fig = go.Figure( go.Sunburst( labels=labels, parents=parents, values=values, branchvalues="total", marker=dict( colors=colors, line=dict(width=0.5, color='DarkSlateGrey') ), hovertext=hovers, hoverinfo="text", ), ) fig.update_layout(margin=dict(t=50, l=0, r=0, b=0), title=title) return fig def _show_most_common_visited_webpages(data, title, count): def drop_qs(url): pos = url.find("?") if pos == -1: return url return url[0:pos] def compute_data(data): res = defaultdict(list) for v in data.values(): for values in v["transient_ids"].values(): for session in values["sessions"]: for event in session.events: res[drop_qs(event.url)].append(session.persistentId) return res def sunburst_data(data): total_sum = sum(len(v) for v in data.values()) yield "", "websites", total_sum, "" for i, (website, persistents) in enumerate(data.items()): yield ( "websites", f"Website {i}", len(persistents), f"
website: {website}" f"
users: {len(set(persistents))}" f"
events: {len(persistents)}" ) for persistent, group in itertools.groupby( sorted(list(persistents)), ): group = list(group) yield ( f"Website {i}", persistent[:5], len(group), f"
persistentId: {persistent}" f"
events: {len(group)}" ) events_data = compute_data(data) most_common = dict(sorted(events_data.items(), key=lambda x: -len(x[1]))[:count]) most_common_counts = {k: len(v) for k, v in most_common.items()} pie_chart = go.Pie( labels=list(most_common_counts.keys()), values=list(most_common_counts.values()), marker=dict(line=dict(color='DarkSlateGrey', width=0.5)), domain=dict(column=0) ) parents, labels, values, hovers = zip(*[r for r in list(sunburst_data(most_common))]) sunburst = go.Sunburst( labels=labels, parents=parents, values=values, branchvalues="total", marker=dict( line=dict(width=0.5, color='DarkSlateGrey') ), hovertext=hovers, hoverinfo="text", domain=dict(column=1) ) layout = go.Layout( grid=go.layout.Grid(columns=2, rows=1), margin=go.layout.Margin(t=50, l=0, r=0, b=0), title=title, legend_orientation="h" ) return go.Figure([pie_chart, sunburst], layout) def _build_networkx_graph_single(query_results, thank_you_page, **kwargs): def drop_qs(url): pos = url.find("?") if pos == -1: return url return url[0:pos] def transient_attrs(transient_id, transient_dict): return { "uid": transient_id, "sessions_count": len(transient_dict["sessions"]), "time_on_device": transient_dict["sessions_duration"] } def session_attrs(session): return hash((session.transientId, session.events[0])), { "duration": get_session_duration(session), "events": len(session.events) } def event_to_website(graph, event, event_label): website = drop_qs(event.url) graph.add_node(website, label="website", url=website) graph.add_node(hash(event), label=event_label, **event._asdict()) graph.add_edge(website, hash(event), label="links_to") for persistent_id, result_dict in generate_stats(query_results, **kwargs).items(): graph = nx.MultiGraph() graph.add_node(persistent_id, label="persistentId", pid=persistent_id) for transient_id, transient_dict in result_dict["transient_ids"].items(): graph.add_node(transient_id, label="transientId", **transient_attrs(transient_id, transient_dict)) graph.add_edge(persistent_id, transient_id, label="has_identity") for session in transient_dict["sessions"]: event_label = "event" if session == transient_dict["purchase_session"]: event_edge_label = "purchase_path" else: event_edge_label = "visited" session_id, session_node_attrs = session_attrs(session) # transient -> session graph.add_node(session_id, label="session", **session_node_attrs) graph.add_edge(session_id, transient_id, label="session") fst_event = session.events[0] # event -> website without query strings event_to_website(graph, fst_event, event_label) # session -> first session event graph.add_edge(session_id, hash(fst_event), label="session_start") for fst_event, snd_event in consecutive_pairs(session.events): event_to_website(graph, fst_event, event_label) event_to_website(graph, snd_event, event_label) graph.add_edge(hash(fst_event), hash(snd_event), label=event_edge_label) graph.nodes[result_dict["first_device"]]["size"] = 15 yield persistent_id, graph def _custom_layout(graph): """Custom layout function.""" def _transform_graph(graph): """ Transform one graph into another for the purposes of better visualisation. We rebuild the graph in a tricky way to force the position computation algorithm to allign with the desired shape. """ new_graph = nx.MultiGraph() for edge in graph.edges(data=True): fst, snd, params = edge label = params["label"] new_graph.add_node(fst, **graph.nodes[fst]) new_graph.add_node(snd, **graph.nodes[snd]) if label == "links_to": # website -> event # => event -> user_website -> website user_website = f"{fst}_{snd}" new_graph.add_node(user_website, label="user_website") new_graph.add_edge(snd, user_website, label="session_visit") new_graph.add_edge(user_website, fst, label="session_link") else: new_graph.add_edge(fst, snd, **params) return new_graph return nx.kamada_kawai_layout(_transform_graph(graph))