This article is a continuation of the article I posted the other day.
This time, I personally collected BitCoint price fluctuation data with a certain Rest API and it became too heavy, so I will write about when downsampling and reducing the number of documents.
It is an aggregation process. In pymongo, aggregation processing is executed by passing a pipeline (conditional statement) to the aggregate function. The pileline is composed of stage and operator, each stage corresponds to "select", "group by", "where" etc. for SQL, and operator corresponds to "sum", "max", "min" etc. ..
Reference: Aggregate Pipeline (stage) Reference: Aggregate Pileline (operator)
As a result of putting the acquisition result of Rest API into the collection as it is, I put a lot of data like the following in vain (51240). (If I left it for several days at 10-minute intervals, it would have accumulated before I knew it ...) Since it is annoying, I downsampled it to reduce the number of data.
Documents stored in the collection
client = MongoClient()
db = client["BitCoin"]["document"]
pprint(db.count()) #A function to get the number of documents in a collection
pprint(db.find_one())
"""
#Output result
51240
{'_id': ObjectId('5f328ad85ae5ac59aee515cb'),
'best_ask': 1245419.0,
'best_ask_size': 0.02,
'best_bid': 1244658.0,
'best_bid_size': 0.05,
'ltp': 1245615.0,
'product_code': 'BTC_JPY',
'tick_id': 10956004,
'timestamp': 1597115465.0,
'total_ask_depth': 1364.44898005,
'total_bid_depth': 1637.4300907,
'volume': 126756.67774321,
'volume_by_product': 6571.45287901
}
"""
The graph looks like this ... There are too many points and it's really annoying
For the time being, we grouped the data at 10-minute intervals into daily data, averaged each value, and downsampled.
Below is the aggregate pipeline used in pymongo.
pipeline
coin = "BTC_JPY"
interval = 60*60*24 # 24hour
pipeline = [
# match stage
{"$match": {"product_code": coin} },
# group stage
{"$group": {
"_id":
{"timestamp":
{"$subtract": ["$timestamp", { "$mod": ["$timestamp", interval]}]
}
,
"product_code": "$product_code"
},
"timestamp":{"$avg": "$timestamp"},
"ltp": {"$avg": "$ltp"},
"best_ask": {"$avg": "$best_ask"},
"best_ask_size": {"$avg": "$best_ask_size"},
"best_bid_size": {"$avg": "$best_bid_size"},
"total_ask_depth": {"$avg": "$total_ask_depth"},
"total_bid_depth": {"$avg": "$total_bid_depth"},
"volume": {"$avg": "$volume"},
"volume_by_product": {"$avg": "$volume_by_product"},
}},
# presentation stage
{"$project": {
"product_code": "$_id.product_code",
"_id": 0, "timestamp": 1,"ltp": 1,
"best_ask": 1,"best_ask_size": 1,
"best_bid_size": 1,
"total_ask_depth": 1,
"total_bid_depth": 1,
"volume": 1, "volume_by_product": 1,
}
}
]
I will explain the pipeline.
Get the target to be aggregated ($ match
)
This time, I got the ones that match product_code
.
(You can specify it in the same way as find.)
{"$match": {"product_code": coin} }, ```
Grouping ($ group
)
The product_code and timestamp were grouped on unix time so that they match at 1-day intervals, and the other values were averaged.
The following two points can be mentioned.
Set the target to be grouped in _id
After _id
, specify the key and calculation method (operator) you want to get, such as the average and maximum value.
{"$group": {
"_id": #Set the target to be grouped here
{"timestamp":
{"$subtract":
["$timestamp",
{ "$mod": ["$timestamp", interval]}]
}
,
"product_code": "$product_code"
},
"timestamp":{"$avg": "$timestamp"},
"ltp": {"$avg": "$ltp"},
Specify the data to display ($ project
)
(You can do it with the same operation as project in find
)
{"$project": {
"product_code": "$_id.product_code",
"_id": 0, "timestamp": 1,"ltp": 1,
"best_ask": 1,"best_ask_size": 1,
"best_bid_size": 1,
"total_ask_depth": 1,
"total_bid_depth": 1,
"volume": 1, "volume_by_product": 1,
}
}
I compared the data downsampled by the previous pipeline with the original data. The red dot is before downsampling and the blue is after downsampling. You can see that the data is thinned out nicely.
import matplotlib.pyplot as plt
plt.figure()
for i in db.find( filter= {"product_code": coin
} ):
plt.scatter(i["timestamp"], i["ltp"], marker=".", color="r")
for i in db.aggregate(pipeline=pipeline):
plt.scatter(i["timestamp"], i["ltp"], marker=".", color="b")
plt.grid()
plt.xlabel("Data[unixtime]")
plt.ylabel(coin)
plt.savefig("test2.jpg ")
plt.show()
Well, there are still many things about aggregate, but there are too many, so this time I will stop here. I will add corrections, questions, and anything I want you to write.