visit
// 1. Create the product
var product = mParticle.eCommerce.createProduct(
'Skateboard', // Name
'prsx-10', // SKU
100.00, // Price
1 // Quantity
);
// 2. Summarize the transaction
var transactionAttributes = {
Id: 'some-transaction-id',
Revenue: 100,
Tax: 9
};
// 3. Log the purchase event;
mParticle.eCommerce.logProductAction(
mParticle.ProductActionType.Purchase,
[product],
null, //optional custom attributes would go here
null, //optional custom flags would go here
transactionAttributes);
{
"mpid" 7667215, // master user identity
"environment": "production",
"user_identities": {
"email": "[email protected]"
},
"user_attributes": {
"$firstname": "Milo",
"$lastname": "Minderbinder"
},
"events": [{
"data": {
"product_action": {
"action": "view_detail", // Others actions are "add_to_cart", "remove_from_cart", and "purchase"
"products": [{
"id": "prsx-10", // Product SKU
"price": 100
}]
},
"timestamp": 72
},
"event_type": "commerce_event"
}]
}
aws kinesis create-stream \
--stream-name Items4UCommerceEventStream \
--shard-count 1
Save the
StreamARN
from the response.2. Create a role for mParticle to assumeFor mParticle to be able to upload to the Kinesis stream, I need to create an IAM role for mParticle to assume. This role needs a policy allowing PutRecord access to Kinesis (), and a trust policy () allowing mParticle to assume the role.aws iam create-role --role-name mparticle-kinesis-role --assume-role-policy-document file:///path/to/mp-trust-policy.json
aws iam put-role-policy --role-name mparticle-kinesis-role --policy-name mp-kinesis-put --policy-document file:///path/to/mp-kinesis-role.json
Create Configuration
First, I need to create an overall configuration for Kinesis. This holds all the settings that will remain the same for every input I connect. Each mParticle integration requires different settings. For example, API keys are commonly required. For Kinesis, I've already granted mParticle write access using IAM, so I only need to provide my AWS account number here.Connect All Sources
Now I need to connect each of my four inputs: iOS, Android, Web and POS, to Kinesis.Set Filters
mParticle lets me switch each individual event name on or off for a particular output, like Kinesis. These help me ensure that I'm only sending to Kinesis the data that I need to train my ML model. I'm interested in 4 types of commerce events:aws personalize create-dataset-group --name Items4UCommerceEvents
Save the
datasetGroupArn
from the response.User ID
- this will be the mParticle IDSession ID
- mParticle automatically creates a unique ID for each session, which I can use.Item ID
- this will be the SKU of the productEvent Type
- this will be the type of product interaction: Add to Cart, Add to Wishlist, Purchase, or View Detail.Timestamp
- time of the interaction. mParticle automatically records a timestamp for each interaction.{
"type": "record",
"name": "Interactions",
"namespace": "com.amazonaws.personalize.schema",
"fields": [
{
"name": "USER_ID",
"type": "string"
},
{
"name": "SESSION_ID",
"type": "string"
},
{
"name": "ITEM_ID",
"type": "string"
},
{
"name": "EVENT_TYPE",
"type": "string"
},
{
"name": "TIMESTAMP",
"type": "long"
}
],
"version": "1.0"
}
aws personalize create-schema \
--name Items4UCommerceEventSchema \
--schema file:///path/to/items4u-commerce-event-schema.json
Save the
schemaArn
from the response.2. Create the dataset:aws personalize create-dataset \
--name Items4UCommerceEventsDataset \
--schema-arn {{saved schema arn}} \
--dataset-group-arn {{saved dataset group arn}} \
--dataset-type Interactions
Save the
datasetArn
from the response.3. Create the tracker:A tracker is an ID linked to the dataset that lets me upload events.aws personalize create-event-tracker \
--name Items4UCommerceEventTracker \
--dataset-group-arn {{saved dataset group arn}}
Save the
trackingID
from the response.USER_ID,EVENT_TYPE,ITEM_ID,SESSION_ID,TIMESTAMP
7667215,view_detail,prsx-23,Q8bQC4gnO8J7ewB,1595492950
-6907502341961927698,purchase,prsx-14,VA9AUJBhoJXAKr7,1595492945
// Import Dependencies
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({storeAsString: true}); // needed to parse 64-bit integer MPID
// Define the product actions we want to report to Personalize
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
// Initialize Personalize
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
exports.handler = (event, context) => {
for (const record of event.Records) {
// Parse encoded payload
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
// Extract required params
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: process.env.TRACKING_ID
};
// Get interactions from events array
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {
itemId: product.id,
};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
// Upload interactions to tracker
personalizeevents.putEvents(params, function(err) {
if (err) console.log(err, err.stack);
else console.log(`Uploaded ${eventList.length} events`)
});
}
}
};
aws iam create-role \
--role-name items4u-lambda-personalize-role \
--assume-role-policy-document file:///path/to/lambda-trust-policy.json
Save the
Role.Arn
from the response.I can use off-the-rack managed policies to grant access to Kinesis and Personalize:aws iam attach-role-policy \
--role-name items4u-lambda-personalize-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
aws iam attach-role-policy \
--role-name items4u-lambda-personalize-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess
aws lambda create-function \
--function-name Items4UPersonalizeLambda \
--runtime nodejs12.x \
--zip-file fileb:///path/to/Items4UPersonalizeLambda.zip \
--role {{role arn}} \
--handler index.handler \
--environment Variables="{MP_KEY=SomeAccessKey,MP_SECRET=SomeAccessSecret,TRACKER_ID=SomeTrackerID}"
aws lambda create-event-source-mapping \
--function-name Items4UPersonalizeLambda \
--event-source-arn {{Kinesis stream arn}} \
--starting-position LATEST
aws personalize create-solution \
--name Items4URecsSolution \
--dataset-group-arn {{dataset group ARN}} \
--recipe-arn arn:aws:personalize:::recipe/aws-user-personalization
Save the
solutionArn
from the response.2. Create a Solution Version:aws personalize create-solution-version \
--solution-arn {{solution ARN}}
Save the
solutionVersionArn
from the response.The solution version takes some time to create. I can check in on its progress regularly with
describe-solution-version
until the response shows status
: ACTIVE
.aws personalize describe-solution-version \
--solution-version-arn {{solution version ARN}}
aws personalize create-campaign \
--name Items4UProductRecsCampaign \
--solution-version-arn arn:aws:personalize:us-east-1:521255666488:solution/Items4URecsSolution/f58f24b6 \
--min-provisioned-tps 1
// Import Dependencies
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({storeAsString: true}); // needed to parse 64-bit integer MPID
const mParticle = require('mparticle');
// Define the product actions we want to report to Personalize
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
// Initialize Personalize and mParticle
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
const personalizeruntime = new AWS.PersonalizeRuntime({apiVersion: '2018-05-22'});
const mp_api = new mParticle.EventsApi(new mParticle.Configuration(process.env.MP_KEY, process.env.MP_SECRET));
exports.handler = (event, context) => {
for (const record of event.Records) {
// Parse encoded payload
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
// Extract required params
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: process.env.TRACKING_ID
};
// Get interactions from events array
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {
itemId: product.id,
};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
// Upload interactions to tracker
personalizeevents.putEvents(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
// Request product recs
var params = {
campaignArn: process.env.CAMPAIGN_ARN,
numResults: '5',
userId: mpid
};
personalizeruntime.getRecommendations(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
console.log(`Uploaded ${eventList.length} events`)
// Upload product recs to mParticle
const batch = new mParticle.Batch(mParticle.Batch.Environment.development);
batch.mpid = mpid;
const itemList = [];
for (const item of data.itemList) {
itemList.push(item.itemId);
}
batch.user_attributes = {};
batch.user_attributes.product_recs = itemList;
const event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Recs Update', {
product_recs: itemList.join()
});
batch.addEvent(event);
console.log(JSON.stringify(batch));
const callback = function(error, data, response) {
if (error) {
console.error(error);
} else {
console.log('Product Recs updated successfully');
}
};
mp_api.uploadEvents(batch, callback);
}
});
}
});
}
}
};
As well as updating the code, I also need to add the
CAMPAIGN_ARN
environment variable.When I request recs from a Personalize campaign, I can specify the number of recommendations I want. Here, I'm going for a top 5 -- enough to populate a carousel or an email widget.The payload uploaded to mParticle by the Lambda will look like this:{
"environment": "development",
"mpid": "-6907502341961927698",
"user_attributes": {
"product_recs": [
"prsx-4",
"prsx-2",
"prsx-15",
"prsx-30",
"prsx-28"
]
},
"events": [
{
"data": {
"custom_event_type": "other",
"event_name": "AWS Recs Update",
"custom_attributes": {
"product_recs": "prsx-4,prsx-2,prsx-15,prsx-30,prsx-28"
}
},
"event_type": "custom_event"
}
]
}
const AWS = require('aws-sdk');
const JSONBig = require('json-bigint')({ storeAsString: true });
const mParticle = require('mparticle');
const trackingId = "bd973581-6505-46ae-9939-e0642a82b8b4";
const report_actions = ["purchase", "view_detail", "add_to_cart", "add_to_wishlist"];
const personalizeevents = new AWS.PersonalizeEvents({apiVersion: '2018-03-22'});
const personalizeruntime = new AWS.PersonalizeRuntime({apiVersion: '2018-05-22'});
const mp_api = new mParticle.EventsApi(new mParticle.Configuration(process.env.MP_KEY, process.env.MP_SECRET));
exports.handler = function (event, context) {
for (const record of event.Records) {
const payload = JSONBig.parse(Buffer.from(record.kinesis.data, 'base64').toString('ascii'));
const events = payload.events;
const mpid = payload.mpid;
const sessionId = payload.message_id;
const params = {
sessionId: sessionId,
userId: mpid,
trackingId: trackingId
};
// Check for variant and assign one if not already assigned
const variant_assigned = Boolean(payload.user_attributes.ml_variant);
const variant = variant_assigned ? payload.user_attributes.ml_variant : Math.random() > 0.5 ? "A" : "B";
const eventList = [];
for (const e of events) {
if (e.event_type === "commerce_event" && report_actions.indexOf(e.data.product_action.action) >= 0) {
const timestamp = Math.floor(e.data.timestamp_unixtime_ms / 1000);
const action = e.data.product_action.action;
const event_id = e.data.event_id;
for (const product of e.data.product_action.products) {
const obj = {itemId: product.id,};
eventList.push({
properties: obj,
sentAt: timestamp,
eventId: event_id,
eventType: action
});
}
}
}
if (eventList.length > 0) {
params.eventList = eventList;
personalizeevents.putEvents(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
var params = {
// Select campaign based on variant
campaignArn: process.env[`CAMPAIGN_ARN_${variant}`],
numResults: '5',
userId: mpid
};
personalizeruntime.getRecommendations(params, function(err, data) {
if (err) console.log(err, err.stack);
else {
const batch = new mParticle.Batch(mParticle.Batch.Environment.development);
batch.mpid = mpid;
const itemList = [];
for (const item of data.itemList) {
itemList.push(item.itemId);
}
batch.user_attributes = {};
batch.user_attributes.product_recs = itemList;
// Record variant on mParticle user profile
if (!variant_assigned) {
batch.user_attributes.ml_variant = variant
}
const event = new mParticle.AppEvent(mParticle.AppEvent.CustomEventType.other, 'AWS Recs Update');
event.custom_attributes = {product_recs: itemList.join()};
batch.addEvent(event);
const mp_callback = function(error, data, response) {
if (error) {
console.error(error);
} else {
console.log('API called successfully.');
}
};
mp_api.uploadEvents(batch, mp_callback);
}
});
}
});
}
}
};
User journeys are not always linear. For example: a customer might look at a recommended product on my website and not buy it immediately, but pick it up later in a brick-and-mortar store, or when they next use the native app. If I'm running my experiments and analytics on a per-device basis, I'll miss that conversion.Note that because both my analytics in Amplitude and the user bucketing for my A/B test is based on mParticle's master MPID, my A/B test is more complete than if I had bucketed per device. By using the mParticle ID, I can capture the full effect of my campaigns.
In Machine Learning, as in all data-centric tasks, the right infrastructure is key.When you use a CDP like mParticle to center your data infrastructure around a single, cross-platform customer record, your ML campaigns will be faster to set up, and more effective. By using a CDP, you can:
Previously published .