Here is simple C# console app to upload large size files to Azure Cosmos
static
async
Task Main(
string
[] args)
{
string
AuthorizationKey =
""
;
string
DatabaseName =
"wabacDB"
;
string
CollectionName =
"wabacCollection"
;
int
CollectionThroughput = 100000;
string
CollectionPartitionKey =
"/rowid"
;
string
filepath =
@"D:\wabac_"
;
ConnectionPolicy connectionPolicy =
new
ConnectionPolicy
{
ConnectionMode = ConnectionMode.Direct,
ConnectionProtocol = Protocol.Tcp
};
DocumentClient client =
new
DocumentClient(
new
Uri(EndpointUrl), AuthorizationKey,
connectionPolicy);
Database database = client.CreateDatabaseQuery().Where(d => d.Id == DatabaseName).AsEnumerable().FirstOrDefault();
if
(database ==
null
)
{
//await client.DeleteDatabaseAsync(database.SelfLink);
database =
await
client.CreateDatabaseAsync(
new
Database { Id = DatabaseName });
}
DocumentCollection dataCollection = client.CreateDocumentCollectionQuery(UriFactory.CreateDatabaseUri(DatabaseName))
.Where(c => c.Id == CollectionName).AsEnumerable().FirstOrDefault();
if
(dataCollection ==
null
)
{
PartitionKeyDefinition partitionKey =
new
PartitionKeyDefinition
{
Paths =
new
Collection { CollectionPartitionKey }
};
DocumentCollection collection =
new
DocumentCollection { Id = CollectionName, PartitionKey = partitionKey };
dataCollection =
await
client.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(DatabaseName),
collection,
new
RequestOptions { OfferThroughput = CollectionThroughput });
}
// Set retry options high during initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor =
new
BulkExecutor(client, dataCollection);
await
bulkExecutor.InitializeAsync();
// Set retries to 0 to pass complete control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
List documentsToImportInBatch =
new
List();
Console.WriteLine($
"{DateTime.UtcNow} : Start"
);
for
(
int
i = 1; i = 0)
{
string
line = sr.ReadLine();
line = line.Replace(
"\\"
,
"\\\\"
);
var
s = line.Split(
'\t'
);
// 0-LineNumber ; 1-TimeStamp ; 2-Level ;
string
Similarity =
string
.IsNullOrWhiteSpace(s[8]) ?
"0"
: s[8];
documentsToImportInBatch.Add($
"{{\"id\":\"L{s[0]}\",\"rowid\":\"L{s[0]}F\",\"LineNumber\":{s[0]},\"TimeStamp\":\"{s[1]}\",\"Level\":\"{s[2]}\"}}"
);
if
(documentsToImportInBatch.Count > CollectionThroughput)
{
BulkImportResponse bulkImportResponse;
do
{
bulkImportResponse =
await
bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert:
false
,
disableAutomaticIdGeneration:
true
,
maxConcurrencyPerPartitionKeyRange:
null
,
maxInMemorySortingBatchSize:
null
,
cancellationToken: CancellationToken.None);
Console.WriteLine($
"\t\t{DateTime.UtcNow} : uploaded {bulkImportResponse.NumberOfDocumentsImported}"
);
Console.WriteLine($
"\t\t{DateTime.UtcNow} : error {bulkImportResponse.BadInputDocuments.Count}"
);
}
while
((bulkImportResponse.NumberOfDocumentsImported + bulkImportResponse.BadInputDocuments.Count) < documentsToImportInBatch.Count);
Console.WriteLine($
"\t{DateTime.UtcNow} : done with batch {documentsToImportInBatch.Count}"
);
documentsToImportInBatch.Clear();
}
Console.WriteLine($
"\t{DateTime.UtcNow} : done with file#{i}"
);
}
}
Console.WriteLine($
"{DateTime.UtcNow} : End"
);
}